diff options
Diffstat (limited to 'backend/taskqueue')
| -rw-r--r-- | backend/taskqueue/processor.go | 199 | ||||
| -rw-r--r-- | backend/taskqueue/queue.go | 25 | ||||
| -rw-r--r-- | backend/taskqueue/tasks.go | 29 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 33 |
4 files changed, 286 insertions, 0 deletions
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go new file mode 100644 index 0000000..e505c5a --- /dev/null +++ b/backend/taskqueue/processor.go @@ -0,0 +1,199 @@ +package taskqueue + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/hibiken/asynq" + + "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" +) + +type ExecProcessor struct { + q *db.Queries + c chan string +} + +func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor { + return &ExecProcessor{ + q: q, + c: c, + } +} + +func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { + var payload TaskExecPlayload + if err := json.Unmarshal(t.Payload(), &payload); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) + } + + // TODO: upsert + // Create submission record. + submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{ + GameID: int32(payload.GameID), + UserID: int32(payload.UserID), + Code: payload.Code, + CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces. + }) + if err != nil { + return fmt.Errorf("CreateSubmission failed: %v", err) + } + + { + type swiftcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type swiftcResponseData struct { + Result string `json:"result"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := swiftcRequestData{ + MaxDuration: 5000, + Code: payload.Code, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return fmt.Errorf("http.Post failed: %v", err) + } + resData := swiftcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Result != "success" { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: nil, + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + p.c <- "compile_error" + return fmt.Errorf("swiftc failed: %v", resData.Stderr) + } + } + { + type wasmcRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + } + type wasmcResponseData struct { + Result string `json:"result"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := wasmcRequestData{ + MaxDuration: 5000, + Code: payload.Code, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return fmt.Errorf("http.Post failed: %v", err) + } + resData := wasmcResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Result != "success" { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: nil, + Status: "compile_error", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + p.c <- "compile_error" + return fmt.Errorf("wasmc failed: %v", resData.Stderr) + } + } + + testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID)) + if err != nil { + return fmt.Errorf("ListTestcasesByGameID failed: %v", err) + } + + for _, testcase := range testcases { + type testrunRequestData struct { + MaxDuration int `json:"max_duration_ms"` + Code string `json:"code"` + Stdin string `json:"stdin"` + } + type testrunResponseData struct { + Result string `json:"result"` + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + } + reqData := testrunRequestData{ + MaxDuration: 5000, + Code: payload.Code, + Stdin: testcase.Stdin, + } + reqJson, err := json.Marshal(reqData) + if err != nil { + return fmt.Errorf("json.Marshal failed: %v", err) + } + res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson)) + if err != nil { + return fmt.Errorf("http.Post failed: %v", err) + } + resData := testrunResponseData{} + if err := json.NewDecoder(res.Body).Decode(&resData); err != nil { + return fmt.Errorf("json.Decode failed: %v", err) + } + if resData.Result != "success" { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: &testcase.TestcaseID, + Status: resData.Result, + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + p.c <- resData.Result + return fmt.Errorf("testrun failed: %v", resData.Stderr) + } + if isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { + err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{ + SubmissionID: submissionID, + TestcaseID: &testcase.TestcaseID, + Status: "wrong_answer", + Stdout: resData.Stdout, + Stderr: resData.Stderr, + }) + if err != nil { + return fmt.Errorf("CreateTestcaseExecution failed: %v", err) + } + p.c <- "wrong_answer" + return fmt.Errorf("testrun failed: %v", resData.Stdout) + } + } + + return nil +} + +func isTestcaseExecutionCorrect(expectedStdout, actualStdout string) bool { + expectedStdout = strings.TrimSpace(expectedStdout) + actualStdout = strings.TrimSpace(actualStdout) + return actualStdout == expectedStdout +} diff --git a/backend/taskqueue/queue.go b/backend/taskqueue/queue.go new file mode 100644 index 0000000..53ec6d6 --- /dev/null +++ b/backend/taskqueue/queue.go @@ -0,0 +1,25 @@ +package taskqueue + +import ( + "github.com/hibiken/asynq" +) + +type Queue struct { + client *asynq.Client +} + +func NewQueue(redisAddr string) *Queue { + return &Queue{ + client: asynq.NewClient(asynq.RedisClientOpt{ + Addr: redisAddr, + }), + } +} + +func (q *Queue) Close() { + q.client.Close() +} + +func (q *Queue) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error) { + return q.client.Enqueue(task, opts...) +} diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go new file mode 100644 index 0000000..cd67948 --- /dev/null +++ b/backend/taskqueue/tasks.go @@ -0,0 +1,29 @@ +package taskqueue + +import ( + "encoding/json" + + "github.com/hibiken/asynq" +) + +const ( + TaskTypeExec = "exec" +) + +type TaskExecPlayload struct { + GameID int + UserID int + Code string +} + +func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) { + payload, err := json.Marshal(TaskExecPlayload{ + GameID: gameID, + UserID: userID, + Code: code, + }) + if err != nil { + return nil, err + } + return asynq.NewTask(TaskTypeExec, payload), nil +} diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go new file mode 100644 index 0000000..485d6d3 --- /dev/null +++ b/backend/taskqueue/worker_server.go @@ -0,0 +1,33 @@ +package taskqueue + +import ( + "github.com/hibiken/asynq" + + "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db" +) + +type WorkerServer struct { + server *asynq.Server + queries *db.Queries + c chan string +} + +func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *WorkerServer { + return &WorkerServer{ + server: asynq.NewServer( + asynq.RedisClientOpt{ + Addr: redisAddr, + }, + asynq.Config{}, + ), + queries: queries, + c: c, + } +} + +func (s *WorkerServer) Run() error { + mux := asynq.NewServeMux() + mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.c)) + + return s.server.Run(mux) +} |
