From 6bb6071ec1dce0cce59df0cb1c38168918061041 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 7 Aug 2024 22:14:51 +0900 Subject: refactor(backend): move ownership of channel to send task results --- backend/taskqueue/processor.go | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 1d4c412..ca180e9 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -14,14 +14,14 @@ import ( ) type ExecProcessor struct { - q *db.Queries - c chan string + q *db.Queries + results chan TaskExecResult } -func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor { +func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor { return &ExecProcessor{ - q: q, - c: c, + q: q, + results: results, } } @@ -80,7 +80,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- "compile_error" + p.results <- TaskExecResult{ + Task: &payload, + Result: "compile_error", + } return fmt.Errorf("swiftc failed: %v", resData.Stderr) } } @@ -121,7 +124,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- "compile_error" + p.results <- TaskExecResult{ + Task: &payload, + Result: "compile_error", + } return fmt.Errorf("wasmc failed: %v", resData.Stderr) } } @@ -170,7 +176,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- resData.Result + p.results <- TaskExecResult{ + Task: &payload, + Result: resData.Result, + } return fmt.Errorf("testrun failed: %v", resData.Stderr) } if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { @@ -184,12 +193,18 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- "wrong_answer" + p.results <- TaskExecResult{ + Task: &payload, + Result: "wrong_answer", + } return fmt.Errorf("testrun failed: %v", resData.Stdout) } } - p.c <- "success" + p.results <- TaskExecResult{ + Task: &payload, + Result: "success", + } return nil } -- cgit v1.2.3-70-g09d2 From d799993a4fe0d05aa72c9a48d4a76c6db2240038 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 7 Aug 2024 22:44:08 +0900 Subject: feat(backend): make testcase_executions.testcase_id non-null --- backend/db/models.go | 2 +- backend/db/query.sql.go | 2 +- backend/schema.sql | 2 +- backend/taskqueue/processor.go | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/db/models.go b/backend/db/models.go index 431d860..126b5f2 100644 --- a/backend/db/models.go +++ b/backend/db/models.go @@ -49,7 +49,7 @@ type Testcase struct { type TestcaseExecution struct { TestcaseExecutionID int32 SubmissionID int32 - TestcaseID *int32 + TestcaseID int32 Status string Stdout string Stderr string diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go index 18acfda..eefa8a0 100644 --- a/backend/db/query.sql.go +++ b/backend/db/query.sql.go @@ -43,7 +43,7 @@ VALUES ($1, $2, $3, $4, $5) type CreateTestcaseExecutionParams struct { SubmissionID int32 - TestcaseID *int32 + TestcaseID int32 Status string Stdout string Stderr string diff --git a/backend/schema.sql b/backend/schema.sql index d0b6c40..27a63d1 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -67,7 +67,7 @@ CREATE TABLE submissions ( CREATE TABLE testcase_executions ( testcase_execution_id SERIAL PRIMARY KEY, submission_id INT NOT NULL, - testcase_id INT, + testcase_id INT NOT NULL, status VARCHAR(16) NOT NULL, stdout TEXT NOT NULL, stderr TEXT NOT NULL, diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index ca180e9..098b565 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -168,7 +168,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if resData.Result != "success" { err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ SubmissionID: submissionID, - TestcaseID: &testcase.TestcaseID, + TestcaseID: testcase.TestcaseID, Status: resData.Result, Stdout: resData.Stdout, Stderr: resData.Stderr, @@ -185,7 +185,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ SubmissionID: submissionID, - TestcaseID: &testcase.TestcaseID, + TestcaseID: testcase.TestcaseID, Status: "wrong_answer", Stdout: resData.Stdout, Stderr: resData.Stderr, -- cgit v1.2.3-70-g09d2 From 5a2217f7a9d01b925c3db6491a363e6e4db9ca42 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 7 Aug 2024 22:49:33 +0900 Subject: feat(backend): rename testcase_executions to testcase_results --- backend/db/models.go | 14 +++++++------- backend/db/query.sql.go | 10 +++++----- backend/game/hub.go | 12 ++++++------ backend/query.sql | 4 ++-- backend/schema.sql | 16 ++++++++-------- backend/taskqueue/processor.go | 20 ++++++++++---------- 6 files changed, 38 insertions(+), 38 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/db/models.go b/backend/db/models.go index 126b5f2..a8729e4 100644 --- a/backend/db/models.go +++ b/backend/db/models.go @@ -46,13 +46,13 @@ type Testcase struct { Stdout string } -type TestcaseExecution struct { - TestcaseExecutionID int32 - SubmissionID int32 - TestcaseID int32 - Status string - Stdout string - Stderr string +type TestcaseResult struct { + TestcaseResultID int32 + SubmissionID int32 + TestcaseID int32 + Status string + Stdout string + Stderr string } type User struct { diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go index eefa8a0..6c78267 100644 --- a/backend/db/query.sql.go +++ b/backend/db/query.sql.go @@ -36,12 +36,12 @@ func (q *Queries) CreateSubmission(ctx context.Context, arg CreateSubmissionPara return submission_id, err } -const createTestcaseExecution = `-- name: CreateTestcaseExecution :exec -INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr) +const createTestcaseResult = `-- name: CreateTestcaseResult :exec +INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr) VALUES ($1, $2, $3, $4, $5) ` -type CreateTestcaseExecutionParams struct { +type CreateTestcaseResultParams struct { SubmissionID int32 TestcaseID int32 Status string @@ -49,8 +49,8 @@ type CreateTestcaseExecutionParams struct { Stderr string } -func (q *Queries) CreateTestcaseExecution(ctx context.Context, arg CreateTestcaseExecutionParams) error { - _, err := q.db.Exec(ctx, createTestcaseExecution, +func (q *Queries) CreateTestcaseResult(ctx context.Context, arg CreateTestcaseResultParams) error { + _, err := q.db.Exec(ctx, createTestcaseResult, arg.SubmissionID, arg.TestcaseID, arg.Status, diff --git a/backend/game/hub.go b/backend/game/hub.go index 605edf5..ef69ddb 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -33,7 +33,7 @@ type gameHub struct { watchers map[*watcherClient]bool registerWatcher chan *watcherClient unregisterWatcher chan *watcherClient - testcaseExecution chan taskqueue.TaskExecResult + taskResults chan taskqueue.TaskExecResult } func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub { @@ -49,7 +49,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq watchers: make(map[*watcherClient]bool), registerWatcher: make(chan *watcherClient), unregisterWatcher: make(chan *watcherClient), - testcaseExecution: make(chan taskqueue.TaskExecResult), + taskResults: make(chan taskqueue.TaskExecResult), } } @@ -200,16 +200,16 @@ func (hub *gameHub) run() { } func (hub *gameHub) processTaskResults() { - for executionResult := range hub.testcaseExecution { + for taskResult := range hub.taskResults { for player := range hub.players { - if player.playerID != executionResult.Task.UserID { + if player.playerID != taskResult.Task.UserID { continue } player.s2cMessages <- &playerMessageS2CExecResult{ Type: playerMessageTypeS2CExecResult, Data: playerMessageS2CExecResultPayload{ Score: nil, - Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionResult.Result), + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Result), }, } } @@ -344,7 +344,7 @@ func (hubs *GameHubs) Run() { log.Printf("no such game: %d", taskResult.Task.GameID) continue } - hub.testcaseExecution <- taskResult + hub.taskResults <- taskResult } } diff --git a/backend/query.sql b/backend/query.sql index ea04c08..493fdb8 100644 --- a/backend/query.sql +++ b/backend/query.sql @@ -68,6 +68,6 @@ SELECT * FROM testcases WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1) ORDER BY testcases.testcase_id; --- name: CreateTestcaseExecution :exec -INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr) +-- name: CreateTestcaseResult :exec +INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr) VALUES ($1, $2, $3, $4, $5); diff --git a/backend/schema.sql b/backend/schema.sql index 27a63d1..3cc7ed9 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -64,14 +64,14 @@ CREATE TABLE submissions ( CONSTRAINT fk_user_id FOREIGN KEY(user_id) REFERENCES users(user_id) ); -CREATE TABLE testcase_executions ( - testcase_execution_id SERIAL PRIMARY KEY, - submission_id INT NOT NULL, - testcase_id INT NOT NULL, - status VARCHAR(16) NOT NULL, - stdout TEXT NOT NULL, - stderr TEXT NOT NULL, +CREATE TABLE testcase_results ( + testcase_result_id SERIAL PRIMARY KEY, + submission_id INT NOT NULL, + testcase_id INT NOT NULL, + status VARCHAR(16) NOT NULL, + stdout TEXT NOT NULL, + stderr TEXT NOT NULL, CONSTRAINT fk_submission_id FOREIGN KEY(submission_id) REFERENCES submissions(submission_id), CONSTRAINT fk_testcase_id FOREIGN KEY(testcase_id) REFERENCES testcases(testcase_id) ); -CREATE INDEX idx_testcase_executions_submission_id ON testcase_executions(submission_id); +CREATE INDEX idx_testcase_results_submission_id ON testcase_results(submission_id); diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 098b565..f789e2e 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -70,7 +70,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { return fmt.Errorf("json.Decode failed: %v", err) } if resData.Result != "success" { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: nil, Status: "compile_error", @@ -78,7 +78,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Stderr: resData.Stderr, }) if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } p.results <- TaskExecResult{ Task: &payload, @@ -114,7 +114,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { return fmt.Errorf("json.Decode failed: %v", err) } if resData.Result != "success" { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: nil, Status: "compile_error", @@ -122,7 +122,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Stderr: resData.Stderr, }) if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } p.results <- TaskExecResult{ Task: &payload, @@ -166,7 +166,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { return fmt.Errorf("json.Decode failed: %v", err) } if resData.Result != "success" { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: testcase.TestcaseID, Status: resData.Result, @@ -174,7 +174,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Stderr: resData.Stderr, }) if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } p.results <- TaskExecResult{ Task: &payload, @@ -182,8 +182,8 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } return fmt.Errorf("testrun failed: %v", resData.Stderr) } - if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { - err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { + err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: testcase.TestcaseID, Status: "wrong_answer", @@ -191,7 +191,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Stderr: resData.Stderr, }) if err != nil { - return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } p.results <- TaskExecResult{ Task: &payload, @@ -208,7 +208,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { return nil } -func isTestcaseExecutionCorrect(expectedStdout, actualStdout string) bool { +func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool { expectedStdout = strings.TrimSpace(expectedStdout) actualStdout = strings.TrimSpace(actualStdout) return actualStdout == expectedStdout -- cgit v1.2.3-70-g09d2 From 4295abfd48ad8ce217b251d07102deb8aee413d5 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 7 Aug 2024 22:52:44 +0900 Subject: refactor: rename task result's "result" to "status" for consistency --- backend/game/hub.go | 2 +- backend/taskqueue/processor.go | 24 ++++++++++++------------ backend/taskqueue/tasks.go | 2 +- worker/exec.go | 8 ++++---- worker/models.go | 6 +++--- 5 files changed, 21 insertions(+), 21 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/game/hub.go b/backend/game/hub.go index ef69ddb..27a9847 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -209,7 +209,7 @@ func (hub *gameHub) processTaskResults() { Type: playerMessageTypeS2CExecResult, Data: playerMessageS2CExecResultPayload{ Score: nil, - Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Result), + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Status), }, } } diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index f789e2e..135a7d2 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -49,7 +49,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Code string `json:"code"` } type swiftcResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -69,7 +69,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { return fmt.Errorf("json.Decode failed: %v", err) } - if resData.Result != "success" { + if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: nil, @@ -82,7 +82,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } p.results <- TaskExecResult{ Task: &payload, - Result: "compile_error", + Status: "compile_error", } return fmt.Errorf("swiftc failed: %v", resData.Stderr) } @@ -93,7 +93,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Code string `json:"code"` } type wasmcResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -113,7 +113,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { return fmt.Errorf("json.Decode failed: %v", err) } - if resData.Result != "success" { + if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: nil, @@ -126,7 +126,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } p.results <- TaskExecResult{ Task: &payload, - Result: "compile_error", + Status: "compile_error", } return fmt.Errorf("wasmc failed: %v", resData.Stderr) } @@ -144,7 +144,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { Stdin string `json:"stdin"` } type testrunResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -165,11 +165,11 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { return fmt.Errorf("json.Decode failed: %v", err) } - if resData.Result != "success" { + if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ SubmissionID: submissionID, TestcaseID: testcase.TestcaseID, - Status: resData.Result, + Status: resData.Status, Stdout: resData.Stdout, Stderr: resData.Stderr, }) @@ -178,7 +178,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } p.results <- TaskExecResult{ Task: &payload, - Result: resData.Result, + Status: resData.Status, } return fmt.Errorf("testrun failed: %v", resData.Stderr) } @@ -195,7 +195,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } p.results <- TaskExecResult{ Task: &payload, - Result: "wrong_answer", + Status: "wrong_answer", } return fmt.Errorf("testrun failed: %v", resData.Stdout) } @@ -203,7 +203,7 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { p.results <- TaskExecResult{ Task: &payload, - Result: "success", + Status: "success", } return nil } diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index db05553..5c518f4 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -30,7 +30,7 @@ func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) { type TaskExecResult struct { Task *TaskExecPlayload - Result string + Status string Stdout string Stderr string } diff --git a/worker/exec.go b/worker/exec.go index 2ef16fa..10bc99a 100644 --- a/worker/exec.go +++ b/worker/exec.go @@ -95,7 +95,7 @@ func execSwiftCompile( if err := os.WriteFile(inPath, []byte(code), 0644); err != nil { return swiftCompileResponseData{ - Result: resultInternalError, + Status: resultInternalError, Stdout: "", Stderr: err.Error(), } @@ -116,7 +116,7 @@ func execSwiftCompile( ) return swiftCompileResponseData{ - Result: convertCommandErrorToResultType(err), + Status: convertCommandErrorToResultType(err), Stdout: stdout, Stderr: stderr, } @@ -148,7 +148,7 @@ func execWasmCompile( ) return wasmCompileResponseData{ - Result: convertCommandErrorToResultType(err), + Status: convertCommandErrorToResultType(err), Stdout: stdout, Stderr: stderr, } @@ -179,7 +179,7 @@ func execTestRun( ) return testRunResponseData{ - Result: convertCommandErrorToResultType(err), + Status: convertCommandErrorToResultType(err), Stdout: stdout, Stderr: stderr, } diff --git a/worker/models.go b/worker/models.go index b838fe0..a7310bd 100644 --- a/worker/models.go +++ b/worker/models.go @@ -33,7 +33,7 @@ func (req *swiftCompileRequestData) validate() error { } type swiftCompileResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -44,7 +44,7 @@ type wasmCompileRequestData struct { } type wasmCompileResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } @@ -78,7 +78,7 @@ func (req *testRunRequestData) validate() error { } type testRunResponseData struct { - Result string `json:"result"` + Status string `json:"status"` Stdout string `json:"stdout"` Stderr string `json:"stderr"` } -- cgit v1.2.3-70-g09d2 From 47d81ffbd3e4fe178d2935325e312cef77276250 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 7 Aug 2024 23:22:07 +0900 Subject: feat(backend): create submission_results table --- backend/db/models.go | 9 +++++++++ backend/db/query.sql.go | 22 ++++++++++++++++++++++ backend/query.sql | 4 ++++ backend/schema.sql | 11 +++++++++++ backend/taskqueue/processor.go | 6 ++---- 5 files changed, 48 insertions(+), 4 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/db/models.go b/backend/db/models.go index 49e4b52..d4cf98b 100644 --- a/backend/db/models.go +++ b/backend/db/models.go @@ -39,6 +39,15 @@ type Submission struct { CreatedAt pgtype.Timestamp } +type SubmissionResult struct { + SubmissionResultID int32 + SubmissionID int32 + Status string + Stdout string + Stderr string + CreatedAt pgtype.Timestamp +} + type Testcase struct { TestcaseID int32 ProblemID int32 diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go index 6c78267..cfb97fd 100644 --- a/backend/db/query.sql.go +++ b/backend/db/query.sql.go @@ -36,6 +36,28 @@ func (q *Queries) CreateSubmission(ctx context.Context, arg CreateSubmissionPara return submission_id, err } +const createSubmissionResult = `-- name: CreateSubmissionResult :exec +INSERT INTO submission_results (submission_id, status, stdout, stderr) +VALUES ($1, $2, $3, $4) +` + +type CreateSubmissionResultParams struct { + SubmissionID int32 + Status string + Stdout string + Stderr string +} + +func (q *Queries) CreateSubmissionResult(ctx context.Context, arg CreateSubmissionResultParams) error { + _, err := q.db.Exec(ctx, createSubmissionResult, + arg.SubmissionID, + arg.Status, + arg.Stdout, + arg.Stderr, + ) + return err +} + const createTestcaseResult = `-- name: CreateTestcaseResult :exec INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr) VALUES ($1, $2, $3, $4, $5) diff --git a/backend/query.sql b/backend/query.sql index 493fdb8..0d32944 100644 --- a/backend/query.sql +++ b/backend/query.sql @@ -68,6 +68,10 @@ SELECT * FROM testcases WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1) ORDER BY testcases.testcase_id; +-- name: CreateSubmissionResult :exec +INSERT INTO submission_results (submission_id, status, stdout, stderr) +VALUES ($1, $2, $3, $4); + -- name: CreateTestcaseResult :exec INSERT INTO testcase_results (submission_id, testcase_id, status, stdout, stderr) VALUES ($1, $2, $3, $4, $5); diff --git a/backend/schema.sql b/backend/schema.sql index 7188d72..74d1202 100644 --- a/backend/schema.sql +++ b/backend/schema.sql @@ -64,6 +64,17 @@ CREATE TABLE submissions ( CONSTRAINT fk_user_id FOREIGN KEY(user_id) REFERENCES users(user_id) ); +CREATE TABLE submission_results ( + submission_result_id SERIAL PRIMARY KEY, + submission_id INT NOT NULL UNIQUE, + status VARCHAR(16) NOT NULL, + stdout TEXT NOT NULL, + stderr TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + CONSTRAINT fk_submission_id FOREIGN KEY(submission_id) REFERENCES submissions(submission_id) +); +CREATE INDEX idx_submission_results_submission_id ON submission_results(submission_id); + CREATE TABLE testcase_results ( testcase_result_id SERIAL PRIMARY KEY, submission_id INT NOT NULL, diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 135a7d2..b080c46 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -70,9 +70,8 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { return fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { - err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ SubmissionID: submissionID, - TestcaseID: nil, Status: "compile_error", Stdout: resData.Stdout, Stderr: resData.Stderr, @@ -114,9 +113,8 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { return fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { - err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ SubmissionID: submissionID, - TestcaseID: nil, Status: "compile_error", Stdout: resData.Stdout, Stderr: resData.Stderr, -- cgit v1.2.3-70-g09d2 From cfe46bf104dec03c81ca38eb6b6a23b372a271b6 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Wed, 7 Aug 2024 23:49:10 +0900 Subject: feat(backend): split task into smaller task types --- backend/game/hub.go | 54 +++++--- backend/taskqueue/processor.go | 253 ++++++++++++++++++++++--------------- backend/taskqueue/tasks.go | 180 ++++++++++++++++++++++++-- backend/taskqueue/worker_server.go | 32 +++-- 4 files changed, 372 insertions(+), 147 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/game/hub.go b/backend/game/hub.go index 27a9847..58eb180 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -33,7 +33,7 @@ type gameHub struct { watchers map[*watcherClient]bool registerWatcher chan *watcherClient unregisterWatcher chan *watcherClient - taskResults chan taskqueue.TaskExecResult + taskResults chan taskqueue.TaskResult } func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub { @@ -49,7 +49,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq watchers: make(map[*watcherClient]bool), registerWatcher: make(chan *watcherClient), unregisterWatcher: make(chan *watcherClient), - taskResults: make(chan taskqueue.TaskExecResult), + taskResults: make(chan taskqueue.TaskResult), } } @@ -161,7 +161,13 @@ func (hub *gameHub) run() { // TODO: assert game state is gaming log.Printf("submit: %v", message.message) code := msg.Data.Code - task, err := taskqueue.NewExecTask(hub.game.gameID, message.client.playerID, code) + codeSize := len(code) // TODO: exclude whitespaces. + task, err := taskqueue.NewTaskCreateSubmissionRecord( + hub.game.gameID, + message.client.playerID, + code, + codeSize, + ) if err != nil { log.Fatalf("failed to create task: %v", err) } @@ -201,19 +207,31 @@ func (hub *gameHub) run() { func (hub *gameHub) processTaskResults() { for taskResult := range hub.taskResults { - for player := range hub.players { - if player.playerID != taskResult.Task.UserID { - continue - } - player.s2cMessages <- &playerMessageS2CExecResult{ - Type: playerMessageTypeS2CExecResult, - Data: playerMessageS2CExecResultPayload{ - Score: nil, - Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Status), - }, + switch taskResult := taskResult.(type) { + case *taskqueue.TaskResultCreateSubmissionRecord: + // todo + case *taskqueue.TaskResultCompileSwiftToWasm: + // todo + case *taskqueue.TaskResultCompileWasmToNativeExecutable: + // todo + case *taskqueue.TaskResultRunTestcase: + // todo + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Status), + }, + } } + // broadcast to watchers + default: + panic("unexpected task result type") } - // broadcast to watchers } } @@ -270,10 +288,10 @@ type GameHubs struct { hubs map[int]*gameHub q *db.Queries taskQueue *taskqueue.Queue - taskResults chan taskqueue.TaskExecResult + taskResults chan taskqueue.TaskResult } -func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskExecResult) *GameHubs { +func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskResult) *GameHubs { return &GameHubs{ hubs: make(map[int]*gameHub), q: q, @@ -339,9 +357,9 @@ func (hubs *GameHubs) Run() { } for taskResult := range hubs.taskResults { - hub := hubs.getHub(taskResult.Task.GameID) + hub := hubs.getHub(taskResult.GameID()) if hub == nil { - log.Printf("no such game: %d", taskResult.Task.GameID) + log.Printf("no such game: %d", taskResult.GameID()) continue } hub.taskResults <- taskResult diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index b080c46..556bd78 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -13,125 +13,169 @@ import ( "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) -type ExecProcessor struct { +type processor struct { q *db.Queries - results chan TaskExecResult + results chan TaskResult } -func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor { - return &ExecProcessor{ +func newProcessor(q *db.Queries) *processor { + return &processor{ q: q, - results: results, + results: make(chan TaskResult), } } -func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { - var payload TaskExecPlayload +func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCreateSubmissionRecord if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } // TODO: upsert // Create submission record. submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ - GameID: int32(payload.GameID), - UserID: int32(payload.UserID), - Code: payload.Code, - CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces. + GameID: int32(payload.GameID()), + UserID: int32(payload.UserID()), + Code: payload.Code(), + CodeSize: int32(payload.CodeSize), }) if err != nil { + // TODO: send result return fmt.Errorf("CreateSubmission failed: %v", err) } - { - type swiftcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type swiftcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := swiftcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + p.results <- &TaskResultCreateSubmissionRecord{ + TaskPayload: &payload, + SubmissionID: int(submissionID), + } + return nil +} + +func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileSwiftToWasm + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + type swiftcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type swiftcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := swiftcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + // TODO: send result + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + // TODO: send result + return fmt.Errorf("http.Post failed: %v", err) + } + resData := swiftcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(payload.SubmissionID), + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) if err != nil { - return fmt.Errorf("http.Post failed: %v", err) + // TODO: send result + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - resData := swiftcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: submissionID, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - p.results <- TaskExecResult{ - Task: &payload, - Status: "compile_error", - } - return fmt.Errorf("swiftc failed: %v", resData.Stderr) + p.results <- &TaskResultCompileSwiftToWasm{ + TaskPayload: &payload, + Status: "compile_error", } + return fmt.Errorf("swiftc failed: %v", resData.Stderr) } - { - type wasmcRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - } - type wasmcResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := wasmcRequestData{ - MaxDuration: 5000, - Code: payload.Code, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + + // TODO: send result + return nil +} + +func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileWasmToNativeExecutable + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + type wasmcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type wasmcResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := wasmcRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + } + reqJson, err := json.Marshal(reqData) + if err != nil { + // TODO: send result + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + // TODO: send result + return fmt.Errorf("http.Post failed: %v", err) + } + resData := wasmcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(payload.SubmissionID), + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) if err != nil { - return fmt.Errorf("http.Post failed: %v", err) + // TODO: send result + return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - resData := wasmcResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: submissionID, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - p.results <- TaskExecResult{ - Task: &payload, - Status: "compile_error", - } - return fmt.Errorf("wasmc failed: %v", resData.Stderr) + p.results <- &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: &payload, + Status: "compile_error", } + return fmt.Errorf("wasmc failed: %v", resData.Stderr) + } + + // TODO: send result + return nil +} + +func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadRunTestcase + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + // TODO: send result + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID)) + testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID())) if err != nil { + // TODO: send result return fmt.Errorf("ListTestcasesByGameID failed: %v", err) } @@ -148,60 +192,65 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { } reqData := testrunRequestData{ MaxDuration: 5000, - Code: payload.Code, + Code: payload.Code(), Stdin: testcase.Stdin, } reqJson, err := json.Marshal(reqData) if err != nil { + // TODO: send result return fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) if err != nil { + // TODO: send result return fmt.Errorf("http.Post failed: %v", err) } resData := testrunResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + // TODO: send result return fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: submissionID, + SubmissionID: int32(payload.SubmissionID), TestcaseID: testcase.TestcaseID, Status: resData.Status, Stdout: resData.Stdout, Stderr: resData.Stderr, }) if err != nil { + // TODO: send result return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- TaskExecResult{ - Task: &payload, - Status: resData.Status, + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: resData.Status, } return fmt.Errorf("testrun failed: %v", resData.Stderr) } if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: submissionID, + SubmissionID: int32(payload.SubmissionID), TestcaseID: testcase.TestcaseID, Status: "wrong_answer", Stdout: resData.Stdout, Stderr: resData.Stderr, }) if err != nil { + // TODO: send result return fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- TaskExecResult{ - Task: &payload, - Status: "wrong_answer", + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: "wrong_answer", } return fmt.Errorf("testrun failed: %v", resData.Stdout) } } - p.results <- TaskExecResult{ - Task: &payload, - Status: "success", + p.results <- &TaskResultRunTestcase{ + TaskPayload: &payload, + Status: "success", } return nil } diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index 5c518f4..679b6a0 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -6,31 +6,185 @@ import ( "github.com/hibiken/asynq" ) +type TaskType string + const ( - TaskTypeExec = "exec" + TaskTypeCreateSubmissionRecord TaskType = "create_submission_record" + TaskTypeCompileSwiftToWasm TaskType = "compile_swift_to_wasm" + TaskTypeCompileWasmToNativeExecutable TaskType = "compile_wasm_to_native_executable" + TaskTypeRunTestcase TaskType = "run_testcase" ) -type TaskExecPlayload struct { +type TaskPayloadBase struct { GameID int UserID int Code string } -func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) { - payload, err := json.Marshal(TaskExecPlayload{ - GameID: gameID, - UserID: userID, - Code: code, +type TaskPayloadCreateSubmissionRecord struct { + TaskPayloadBase + CodeSize int +} + +func NewTaskCreateSubmissionRecord( + gameID int, + userID int, + code string, + codeSize int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCreateSubmissionRecord{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + CodeSize: codeSize, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCreateSubmissionRecord), payload), nil +} + +func (t *TaskPayloadCreateSubmissionRecord) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCreateSubmissionRecord) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCreateSubmissionRecord) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadCompileSwiftToWasm struct { + TaskPayloadBase + SubmissionID int +} + +func NewTaskCompileSwiftToWasm( + gameID int, + userID int, + code string, + submissionID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCompileSwiftToWasm{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCompileSwiftToWasm), payload), nil +} + +func (t *TaskPayloadCompileSwiftToWasm) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCompileSwiftToWasm) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCompileSwiftToWasm) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadCompileWasmToNativeExecutable struct { + TaskPayloadBase + SubmissionID int +} + +func NewTaskCompileWasmToNativeExecutable( + gameID int, + userID int, + code string, + submissionID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadCompileWasmToNativeExecutable{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(string(TaskTypeCompileWasmToNativeExecutable), payload), nil +} + +func (t *TaskPayloadCompileWasmToNativeExecutable) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadCompileWasmToNativeExecutable) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadCompileWasmToNativeExecutable) Code() string { return t.TaskPayloadBase.Code } + +type TaskPayloadRunTestcase struct { + TaskPayloadBase + SubmissionID int + TestcaseID int +} + +func NewTaskRunTestcase( + gameID int, + userID int, + code string, + submissionID int, + testcaseID int, +) (*asynq.Task, error) { + payload, err := json.Marshal(TaskPayloadRunTestcase{ + TaskPayloadBase: TaskPayloadBase{ + GameID: gameID, + UserID: userID, + Code: code, + }, + SubmissionID: submissionID, + TestcaseID: testcaseID, }) if err != nil { return nil, err } - return asynq.NewTask(TaskTypeExec, payload), nil + return asynq.NewTask(string(TaskTypeRunTestcase), payload), nil +} + +func (t *TaskPayloadRunTestcase) GameID() int { return t.TaskPayloadBase.GameID } +func (t *TaskPayloadRunTestcase) UserID() int { return t.TaskPayloadBase.UserID } +func (t *TaskPayloadRunTestcase) Code() string { return t.TaskPayloadBase.Code } + +type TaskResult interface { + Type() TaskType + GameID() int } -type TaskExecResult struct { - Task *TaskExecPlayload - Status string - Stdout string - Stderr string +type TaskResultCreateSubmissionRecord struct { + TaskPayload *TaskPayloadCreateSubmissionRecord + SubmissionID int + Err error } + +func (r *TaskResultCreateSubmissionRecord) Type() TaskType { return TaskTypeCreateSubmissionRecord } +func (r *TaskResultCreateSubmissionRecord) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultCompileSwiftToWasm struct { + TaskPayload *TaskPayloadCompileSwiftToWasm + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultCompileSwiftToWasm) Type() TaskType { return TaskTypeCompileSwiftToWasm } +func (r *TaskResultCompileSwiftToWasm) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultCompileWasmToNativeExecutable struct { + TaskPayload *TaskPayloadCompileWasmToNativeExecutable + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultCompileWasmToNativeExecutable) Type() TaskType { + return TaskTypeCompileWasmToNativeExecutable +} +func (r *TaskResultCompileWasmToNativeExecutable) GameID() int { return r.TaskPayload.GameID() } + +type TaskResultRunTestcase struct { + TaskPayload *TaskPayloadRunTestcase + Status string + Stdout string + Stderr string + Err error +} + +func (r *TaskResultRunTestcase) Type() TaskType { return TaskTypeRunTestcase } +func (r *TaskResultRunTestcase) GameID() int { return r.TaskPayload.GameID() } diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 67803e2..6dc65d8 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -7,31 +7,35 @@ import ( ) type WorkerServer struct { - server *asynq.Server - queries *db.Queries - results chan TaskExecResult + server *asynq.Server + processor *processor } func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { + server := asynq.NewServer( + asynq.RedisClientOpt{ + Addr: redisAddr, + }, + asynq.Config{}, + ) + processor := newProcessor(queries) return &WorkerServer{ - server: asynq.NewServer( - asynq.RedisClientOpt{ - Addr: redisAddr, - }, - asynq.Config{}, - ), - queries: queries, - results: make(chan TaskExecResult), + server: server, + processor: processor, } } func (s *WorkerServer) Run() error { mux := asynq.NewServeMux() - mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.results)) + + mux.HandleFunc(string(TaskTypeCreateSubmissionRecord), s.processor.processTaskCreateSubmissionRecord) + mux.HandleFunc(string(TaskTypeCompileSwiftToWasm), s.processor.processTaskCompileSwiftToWasm) + mux.HandleFunc(string(TaskTypeCompileWasmToNativeExecutable), s.processor.processTaskCompileWasmToNativeExecutable) + mux.HandleFunc(string(TaskTypeRunTestcase), s.processor.processTaskRunTestcase) return s.server.Run(mux) } -func (s *WorkerServer) Results() chan TaskExecResult { - return s.results +func (s *WorkerServer) Results() chan TaskResult { + return s.processor.results } -- cgit v1.2.3-70-g09d2 From 339f99a26191ede59a9eb0de2819cb5efdeb1535 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Thu, 8 Aug 2024 00:46:45 +0900 Subject: refactor(backend): wrap taskqueue.processor --- backend/taskqueue/processor.go | 168 ++++++++++++++------------------- backend/taskqueue/processor_wrapper.go | 110 +++++++++++++++++++++ backend/taskqueue/worker_server.go | 4 +- 3 files changed, 185 insertions(+), 97 deletions(-) create mode 100644 backend/taskqueue/processor_wrapper.go (limited to 'backend/taskqueue/processor.go') diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 556bd78..d771e61 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -8,32 +8,24 @@ import ( "net/http" "strings" - "github.com/hibiken/asynq" - "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) type processor struct { - q *db.Queries - results chan TaskResult + q *db.Queries } -func newProcessor(q *db.Queries) *processor { - return &processor{ - q: q, - results: make(chan TaskResult), +func newProcessor(q *db.Queries) processor { + return processor{ + q: q, } } -func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCreateSubmissionRecord - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCreateSubmissionRecord( + ctx context.Context, + payload *TaskPayloadCreateSubmissionRecord, +) (*TaskResultCreateSubmissionRecord, error) { // TODO: upsert - // Create submission record. submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ GameID: int32(payload.GameID()), UserID: int32(payload.UserID()), @@ -41,24 +33,19 @@ func (p *processor) processTaskCreateSubmissionRecord(ctx context.Context, t *as CodeSize: int32(payload.CodeSize), }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateSubmission failed: %v", err) + return nil, err } - p.results <- &TaskResultCreateSubmissionRecord{ - TaskPayload: &payload, + return &TaskResultCreateSubmissionRecord{ + TaskPayload: payload, SubmissionID: int(submissionID), - } - return nil + }, nil } -func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCompileSwiftToWasm - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCompileSwiftToWasm( + ctx context.Context, + payload *TaskPayloadCompileSwiftToWasm, +) (*TaskResultCompileSwiftToWasm, error) { type swiftcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -74,18 +61,15 @@ func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq. } reqJson, err := json.Marshal(reqData) if err != nil { - // TODO: send result - return fmt.Errorf("json.Marshal failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) if err != nil { - // TODO: send result - return fmt.Errorf("http.Post failed: %v", err) + return nil, fmt.Errorf("http.Post failed: %v", err) } resData := swiftcResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - // TODO: send result - return fmt.Errorf("json.Decode failed: %v", err) + return nil, fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ @@ -95,27 +79,28 @@ func (p *processor) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq. Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultCompileSwiftToWasm{ - TaskPayload: &payload, + return &TaskResultCompileSwiftToWasm{ + TaskPayload: payload, Status: "compile_error", - } - return fmt.Errorf("swiftc failed: %v", resData.Stderr) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } - // TODO: send result - return nil + return &TaskResultCompileSwiftToWasm{ + TaskPayload: payload, + Status: "success", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } -func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadCompileWasmToNativeExecutable - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskCompileWasmToNativeExecutable( + ctx context.Context, + payload *TaskPayloadCompileWasmToNativeExecutable, +) (*TaskResultCompileWasmToNativeExecutable, error) { type wasmcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -131,18 +116,15 @@ func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context } reqJson, err := json.Marshal(reqData) if err != nil { - // TODO: send result - return fmt.Errorf("json.Marshal failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) if err != nil { - // TODO: send result - return fmt.Errorf("http.Post failed: %v", err) + return nil, fmt.Errorf("http.Post failed: %v", err) } resData := wasmcResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - // TODO: send result - return fmt.Errorf("json.Decode failed: %v", err) + return nil, fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ @@ -152,31 +134,31 @@ func (p *processor) processTaskCompileWasmToNativeExecutable(ctx context.Context Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultCompileWasmToNativeExecutable{ - TaskPayload: &payload, + return &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: payload, Status: "compile_error", - } - return fmt.Errorf("wasmc failed: %v", resData.Stderr) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } - // TODO: send result - return nil + return &TaskResultCompileWasmToNativeExecutable{ + TaskPayload: payload, + Status: "success", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } -func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { - var payload TaskPayloadRunTestcase - if err := json.Unmarshal(t.Payload(), &payload); err != nil { - // TODO: send result - return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) - } - +func (p *processor) doProcessTaskRunTestcase( + ctx context.Context, + payload *TaskPayloadRunTestcase, +) (*TaskResultRunTestcase, error) { testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID())) if err != nil { - // TODO: send result - return fmt.Errorf("ListTestcasesByGameID failed: %v", err) + return nil, fmt.Errorf("ListTestcasesByGameID failed: %v", err) } for _, testcase := range testcases { @@ -197,18 +179,15 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e } reqJson, err := json.Marshal(reqData) if err != nil { - // TODO: send result - return fmt.Errorf("json.Marshal failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) if err != nil { - // TODO: send result - return fmt.Errorf("http.Post failed: %v", err) + return nil, fmt.Errorf("http.Post failed: %v", err) } resData := testrunResponseData{} if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - // TODO: send result - return fmt.Errorf("json.Decode failed: %v", err) + return nil, fmt.Errorf("json.Decode failed: %v", err) } if resData.Status != "success" { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ @@ -219,14 +198,14 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultRunTestcase{ - TaskPayload: &payload, + return &TaskResultRunTestcase{ + TaskPayload: payload, Status: resData.Status, - } - return fmt.Errorf("testrun failed: %v", resData.Stderr) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ @@ -237,22 +216,21 @@ func (p *processor) processTaskRunTestcase(ctx context.Context, t *asynq.Task) e Stderr: resData.Stderr, }) if err != nil { - // TODO: send result - return fmt.Errorf("CreateTestcaseResult failed: %v", err) + return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) } - p.results <- &TaskResultRunTestcase{ - TaskPayload: &payload, + return &TaskResultRunTestcase{ + TaskPayload: payload, Status: "wrong_answer", - } - return fmt.Errorf("testrun failed: %v", resData.Stdout) + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } } - p.results <- &TaskResultRunTestcase{ - TaskPayload: &payload, + return &TaskResultRunTestcase{ + TaskPayload: payload, Status: "success", - } - return nil + }, nil } func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool { diff --git a/backend/taskqueue/processor_wrapper.go b/backend/taskqueue/processor_wrapper.go new file mode 100644 index 0000000..1ae9f2f --- /dev/null +++ b/backend/taskqueue/processor_wrapper.go @@ -0,0 +1,110 @@ +package taskqueue + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/hibiken/asynq" +) + +type processorWrapper struct { + impl processor + results chan TaskResult +} + +func newProcessorWrapper(impl processor) *processorWrapper { + return &processorWrapper{ + impl: impl, + results: make(chan TaskResult), + } +} + +func (p *processorWrapper) processTaskCompileSwiftToWasm(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileSwiftToWasm + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCompileSwiftToWasm{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCompileSwiftToWasm(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCompileSwiftToWasm{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskCompileWasmToNativeExecutable(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCompileWasmToNativeExecutable + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCompileWasmToNativeExecutable(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCompileWasmToNativeExecutable{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskCreateSubmissionRecord(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadCreateSubmissionRecord + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultCreateSubmissionRecord{Err: err} + return err + } + + result, err := p.impl.doProcessTaskCreateSubmissionRecord(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultCreateSubmissionRecord{Err: err} + } + return err + } + p.results <- result + return nil +} + +func (p *processorWrapper) processTaskRunTestcase(ctx context.Context, t *asynq.Task) error { + var payload TaskPayloadRunTestcase + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + err := fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + p.results <- &TaskResultRunTestcase{Err: err} + return err + } + + result, err := p.impl.doProcessTaskRunTestcase(ctx, &payload) + if err != nil { + retryCount, _ := asynq.GetRetryCount(ctx) + maxRetry, _ := asynq.GetMaxRetry(ctx) + isRecoverable := !errors.Is(err, asynq.SkipRetry) && retryCount < maxRetry + if !isRecoverable { + p.results <- &TaskResultRunTestcase{Err: err} + } + return err + } + p.results <- result + return nil +} diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 6dc65d8..317f61c 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -8,7 +8,7 @@ import ( type WorkerServer struct { server *asynq.Server - processor *processor + processor *processorWrapper } func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { @@ -18,7 +18,7 @@ func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { }, asynq.Config{}, ) - processor := newProcessor(queries) + processor := newProcessorWrapper(newProcessor(queries)) return &WorkerServer{ server: server, processor: processor, -- cgit v1.2.3-70-g09d2 From 4eb7e89d6a77a4434bd087fbb86873521d30a8f5 Mon Sep 17 00:00:00 2001 From: nsfisis Date: Thu, 8 Aug 2024 01:30:09 +0900 Subject: feat(backend): implement processTaskResults() partially --- backend/game/hub.go | 274 ++++++++++++++++++++++++++++++++++++++--- backend/taskqueue/processor.go | 148 ++++++---------------- backend/taskqueue/queue.go | 4 + backend/taskqueue/tasks.go | 6 + 4 files changed, 305 insertions(+), 127 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/game/hub.go b/backend/game/hub.go index b51d977..d17ff7c 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "strings" "time" "github.com/jackc/pgx/v5/pgtype" @@ -204,36 +205,273 @@ func (hub *gameHub) run() { } } +type codeSubmissionError struct { + Status string + Stdout string + Stderr string +} + +func (err *codeSubmissionError) Error() string { + return err.Stderr +} + func (hub *gameHub) processTaskResults() { for taskResult := range hub.taskResults { switch taskResult := taskResult.(type) { case *taskqueue.TaskResultCreateSubmissionRecord: - // todo + err := hub.processTaskResultCreateSubmissionRecord(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } + } + // TODO: broadcast to watchers + } case *taskqueue.TaskResultCompileSwiftToWasm: - // todo + err := hub.processTaskResultCompileSwiftToWasm(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } + } + // TODO: broadcast to watchers + } case *taskqueue.TaskResultCompileWasmToNativeExecutable: - // todo - case *taskqueue.TaskResultRunTestcase: - // todo - for player := range hub.players { - if player.playerID != taskResult.TaskPayload.UserID() { - continue + err := hub.processTaskResultCompileWasmToNativeExecutable(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } } - player.s2cMessages <- &playerMessageS2CExecResult{ - Type: playerMessageTypeS2CExecResult, - Data: playerMessageS2CExecResultPayload{ - Score: nil, - Status: api.GamePlayerMessageS2CExecResultPayloadStatus(taskResult.Status), - }, + // TODO: broadcast to watchers + } + case *taskqueue.TaskResultRunTestcase: + err := hub.processTaskResultRunTestcase(taskResult) + if err != nil { + for player := range hub.players { + if player.playerID != taskResult.TaskPayload.UserID() { + continue + } + player.s2cMessages <- &playerMessageS2CExecResult{ + Type: playerMessageTypeS2CExecResult, + Data: playerMessageS2CExecResultPayload{ + Score: nil, + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(err.Status), + }, + } } + // TODO: broadcast to watchers } - // broadcast to watchers + // TODO: aggregate results of testcases default: panic("unexpected task result type") } } } +func (hub *gameHub) processTaskResultCreateSubmissionRecord( + taskResult *taskqueue.TaskResultCreateSubmissionRecord, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if err := hub.taskQueue.EnqueueTaskCompileSwiftToWasm( + taskResult.TaskPayload.GameID(), + taskResult.TaskPayload.UserID(), + taskResult.TaskPayload.Code(), + taskResult.SubmissionID, + ); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return nil +} + +func (hub *gameHub) processTaskResultCompileSwiftToWasm( + taskResult *taskqueue.TaskResultCompileSwiftToWasm, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if taskResult.Status != "success" { + if err := hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + if err := hub.taskQueue.EnqueueTaskCompileWasmToNativeExecutable( + taskResult.TaskPayload.GameID(), + taskResult.TaskPayload.UserID(), + taskResult.TaskPayload.Code(), + taskResult.TaskPayload.SubmissionID, + ); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return nil +} + +func (hub *gameHub) processTaskResultCompileWasmToNativeExecutable( + taskResult *taskqueue.TaskResultCompileWasmToNativeExecutable, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if taskResult.Status != "success" { + if err := hub.q.CreateSubmissionResult(hub.ctx, db.CreateSubmissionResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + + testcases, err := hub.q.ListTestcasesByGameID(hub.ctx, int32(taskResult.TaskPayload.GameID())) + if err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + if len(testcases) == 0 { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: "no testcases found", + } + } + + for _, testcase := range testcases { + if err := hub.taskQueue.EnqueueTaskRunTestcase( + taskResult.TaskPayload.GameID(), + taskResult.TaskPayload.UserID(), + taskResult.TaskPayload.Code(), + taskResult.TaskPayload.SubmissionID, + int(testcase.TestcaseID), + testcase.Stdin, + testcase.Stdout, + ); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + } + return nil +} + +func (hub *gameHub) processTaskResultRunTestcase( + taskResult *taskqueue.TaskResultRunTestcase, +) *codeSubmissionError { + if taskResult.Err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: taskResult.Err.Error(), + } + } + + if taskResult.Status != "success" { + if err := hub.q.CreateTestcaseResult(hub.ctx, db.CreateTestcaseResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + TestcaseID: int32(taskResult.TaskPayload.TestcaseID), + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: taskResult.Status, + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + if !isTestcaseResultCorrect(taskResult.TaskPayload.Stdout, taskResult.Stdout) { + if err := hub.q.CreateTestcaseResult(hub.ctx, db.CreateTestcaseResultParams{ + SubmissionID: int32(taskResult.TaskPayload.SubmissionID), + TestcaseID: int32(taskResult.TaskPayload.TestcaseID), + Status: "wrong_answer", + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + }); err != nil { + return &codeSubmissionError{ + Status: "internal_error", + Stderr: err.Error(), + } + } + return &codeSubmissionError{ + Status: "wrong_answer", + Stdout: taskResult.Stdout, + Stderr: taskResult.Stderr, + } + } + return nil +} + func (hub *gameHub) startGame() error { for player := range hub.players { player.s2cMessages <- &playerMessageS2CPrepare{ @@ -376,3 +614,9 @@ func (hubs *GameHubs) StartGame(gameID int) error { } return hub.startGame() } + +func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool { + expectedStdout = strings.TrimSpace(expectedStdout) + actualStdout = strings.TrimSpace(actualStdout) + return actualStdout == expectedStdout +} diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index d771e61..ba35a1b 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "net/http" - "strings" "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" ) @@ -46,6 +45,7 @@ func (p *processor) doProcessTaskCompileSwiftToWasm( ctx context.Context, payload *TaskPayloadCompileSwiftToWasm, ) (*TaskResultCompileSwiftToWasm, error) { + _ = ctx type swiftcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -71,27 +71,9 @@ func (p *processor) doProcessTaskCompileSwiftToWasm( if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { return nil, fmt.Errorf("json.Decode failed: %v", err) } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: int32(payload.SubmissionID), - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - return &TaskResultCompileSwiftToWasm{ - TaskPayload: payload, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil - } - return &TaskResultCompileSwiftToWasm{ TaskPayload: payload, - Status: "success", + Status: resData.Status, Stdout: resData.Stdout, Stderr: resData.Stderr, }, nil @@ -101,6 +83,7 @@ func (p *processor) doProcessTaskCompileWasmToNativeExecutable( ctx context.Context, payload *TaskPayloadCompileWasmToNativeExecutable, ) (*TaskResultCompileWasmToNativeExecutable, error) { + _ = ctx type wasmcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -126,27 +109,9 @@ func (p *processor) doProcessTaskCompileWasmToNativeExecutable( if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { return nil, fmt.Errorf("json.Decode failed: %v", err) } - if resData.Status != "success" { - err := p.q.CreateSubmissionResult(ctx, db.CreateSubmissionResultParams{ - SubmissionID: int32(payload.SubmissionID), - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - return &TaskResultCompileWasmToNativeExecutable{ - TaskPayload: payload, - Status: "compile_error", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil - } - return &TaskResultCompileWasmToNativeExecutable{ TaskPayload: payload, - Status: "success", + Status: resData.Status, Stdout: resData.Stdout, Stderr: resData.Stderr, }, nil @@ -156,75 +121,40 @@ func (p *processor) doProcessTaskRunTestcase( ctx context.Context, payload *TaskPayloadRunTestcase, ) (*TaskResultRunTestcase, error) { - testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID())) + type testrunRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + Stdin string `json:"stdin"` + } + type testrunResponseData struct { + Status string `json:"status"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := testrunRequestData{ + MaxDuration: 5000, + Code: payload.Code(), + Stdin: payload.Stdin, + } + reqJson, err := json.Marshal(reqData) if err != nil { - return nil, fmt.Errorf("ListTestcasesByGameID failed: %v", err) + return nil, fmt.Errorf("json.Marshal failed: %v", err) } - - for _, testcase := range testcases { - type testrunRequestData struct { - MaxDuration int `json:"max_duration_ms"` - Code string `json:"code"` - Stdin string `json:"stdin"` - } - type testrunResponseData struct { - Status string `json:"status"` - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - } - reqData := testrunRequestData{ - MaxDuration: 5000, - Code: payload.Code(), - Stdin: testcase.Stdin, - } - reqJson, err := json.Marshal(reqData) - if err != nil { - return nil, fmt.Errorf("json.Marshal failed: %v", err) - } - res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) - if err != nil { - return nil, fmt.Errorf("http.Post failed: %v", err) - } - resData := testrunResponseData{} - if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { - return nil, fmt.Errorf("json.Decode failed: %v", err) - } - if resData.Status != "success" { - err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: int32(payload.SubmissionID), - TestcaseID: testcase.TestcaseID, - Status: resData.Status, - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - return &TaskResultRunTestcase{ - TaskPayload: payload, - Status: resData.Status, - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil - } - if !isTestcaseResultCorrect(testcase.Stdout, resData.Stdout) { - err := p.q.CreateTestcaseResult(ctx, db.CreateTestcaseResultParams{ - SubmissionID: int32(payload.SubmissionID), - TestcaseID: testcase.TestcaseID, - Status: "wrong_answer", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }) - if err != nil { - return nil, fmt.Errorf("CreateTestcaseResult failed: %v", err) - } - return &TaskResultRunTestcase{ - TaskPayload: payload, - Status: "wrong_answer", - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil - } + res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return nil, fmt.Errorf("http.Post failed: %v", err) + } + resData := testrunResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return nil, fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Status != "success" { + return &TaskResultRunTestcase{ + TaskPayload: payload, + Status: resData.Status, + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }, nil } return &TaskResultRunTestcase{ @@ -232,9 +162,3 @@ func (p *processor) doProcessTaskRunTestcase( Status: "success", }, nil } - -func isTestcaseResultCorrect(expectedStdout, actualStdout string) bool { - expectedStdout = strings.TrimSpace(expectedStdout) - actualStdout = strings.TrimSpace(actualStdout) - return actualStdout == expectedStdout -} diff --git a/backend/taskqueue/queue.go b/backend/taskqueue/queue.go index b7d7381..515a406 100644 --- a/backend/taskqueue/queue.go +++ b/backend/taskqueue/queue.go @@ -83,6 +83,8 @@ func (q *Queue) EnqueueTaskRunTestcase( code string, submissionID int, testcaseID int, + stdin string, + stdout string, ) error { task, err := newTaskRunTestcase( gameID, @@ -90,6 +92,8 @@ func (q *Queue) EnqueueTaskRunTestcase( code, submissionID, testcaseID, + stdin, + stdout, ) if err != nil { return err diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index 990ce65..cbe83b1 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -112,6 +112,8 @@ type TaskPayloadRunTestcase struct { TaskPayloadBase SubmissionID int TestcaseID int + Stdin string + Stdout string } func newTaskRunTestcase( @@ -120,6 +122,8 @@ func newTaskRunTestcase( code string, submissionID int, testcaseID int, + stdin string, + stdout string, ) (*asynq.Task, error) { payload, err := json.Marshal(TaskPayloadRunTestcase{ TaskPayloadBase: TaskPayloadBase{ @@ -129,6 +133,8 @@ func newTaskRunTestcase( }, SubmissionID: submissionID, TestcaseID: testcaseID, + Stdin: stdin, + Stdout: stdout, }) if err != nil { return nil, err -- cgit v1.2.3-70-g09d2 From 113c83b19acc58fbd46e8acdac67ff1a112d0d8c Mon Sep 17 00:00:00 2001 From: nsfisis Date: Thu, 8 Aug 2024 19:30:14 +0900 Subject: fix(backend): fix an issue where stdout and stderr of testcase results are discarded --- backend/taskqueue/processor.go | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) (limited to 'backend/taskqueue/processor.go') diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index ba35a1b..b64b01c 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -42,10 +42,9 @@ func (p *processor) doProcessTaskCreateSubmissionRecord( } func (p *processor) doProcessTaskCompileSwiftToWasm( - ctx context.Context, + _ context.Context, payload *TaskPayloadCompileSwiftToWasm, ) (*TaskResultCompileSwiftToWasm, error) { - _ = ctx type swiftcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -80,10 +79,9 @@ func (p *processor) doProcessTaskCompileSwiftToWasm( } func (p *processor) doProcessTaskCompileWasmToNativeExecutable( - ctx context.Context, + _ context.Context, payload *TaskPayloadCompileWasmToNativeExecutable, ) (*TaskResultCompileWasmToNativeExecutable, error) { - _ = ctx type wasmcRequestData struct { MaxDuration int `json:"max_duration_ms"` Code string `json:"code"` @@ -118,7 +116,7 @@ func (p *processor) doProcessTaskCompileWasmToNativeExecutable( } func (p *processor) doProcessTaskRunTestcase( - ctx context.Context, + _ context.Context, payload *TaskPayloadRunTestcase, ) (*TaskResultRunTestcase, error) { type testrunRequestData struct { @@ -148,17 +146,10 @@ func (p *processor) doProcessTaskRunTestcase( if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { return nil, fmt.Errorf("json.Decode failed: %v", err) } - if resData.Status != "success" { - return &TaskResultRunTestcase{ - TaskPayload: payload, - Status: resData.Status, - Stdout: resData.Stdout, - Stderr: resData.Stderr, - }, nil - } - return &TaskResultRunTestcase{ TaskPayload: payload, - Status: "success", + Status: resData.Status, + Stdout: resData.Stdout, + Stderr: resData.Stderr, }, nil } -- cgit v1.2.3-70-g09d2