aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2024-08-05 05:09:41 +0900
committernsfisis <nsfisis@gmail.com>2024-08-05 05:14:30 +0900
commit2fc239b3f4d49f1a257523df7c7781a2141252bf (patch)
tree5b5c826e39dbc560bea29461e65a1f712382e19d /backend
parent1325626397703567828a903a82fc1ed8626dbad4 (diff)
downloadphperkaigi-2025-albatross-2fc239b3f4d49f1a257523df7c7781a2141252bf.tar.gz
phperkaigi-2025-albatross-2fc239b3f4d49f1a257523df7c7781a2141252bf.tar.zst
phperkaigi-2025-albatross-2fc239b3f4d49f1a257523df7c7781a2141252bf.zip
feat(backend): implement task queue processor
Diffstat (limited to 'backend')
-rw-r--r--backend/db/query.sql.go79
-rw-r--r--backend/main.go2
-rw-r--r--backend/query.sql13
-rw-r--r--backend/taskqueue/processor.go176
-rw-r--r--backend/taskqueue/worker_server.go9
5 files changed, 271 insertions, 8 deletions
diff --git a/backend/db/query.sql.go b/backend/db/query.sql.go
index 8df3bf5..89506d0 100644
--- a/backend/db/query.sql.go
+++ b/backend/db/query.sql.go
@@ -11,6 +11,55 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
+const createSubmission = `-- name: CreateSubmission :one
+INSERT INTO submissions (game_id, user_id, code, code_size)
+VALUES ($1, $2, $3, $4)
+RETURNING submission_id
+`
+
+type CreateSubmissionParams struct {
+ GameID int32
+ UserID int32
+ Code string
+ CodeSize int32
+}
+
+func (q *Queries) CreateSubmission(ctx context.Context, arg CreateSubmissionParams) (int32, error) {
+ row := q.db.QueryRow(ctx, createSubmission,
+ arg.GameID,
+ arg.UserID,
+ arg.Code,
+ arg.CodeSize,
+ )
+ var submission_id int32
+ err := row.Scan(&submission_id)
+ return submission_id, err
+}
+
+const createTestcaseExecution = `-- name: CreateTestcaseExecution :exec
+INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr)
+VALUES ($1, $2, $3, $4, $5)
+`
+
+type CreateTestcaseExecutionParams struct {
+ SubmissionID int32
+ TestcaseID *int32
+ Status string
+ Stdout string
+ Stderr string
+}
+
+func (q *Queries) CreateTestcaseExecution(ctx context.Context, arg CreateTestcaseExecutionParams) error {
+ _, err := q.db.Exec(ctx, createTestcaseExecution,
+ arg.SubmissionID,
+ arg.TestcaseID,
+ arg.Status,
+ arg.Stdout,
+ arg.Stderr,
+ )
+ return err
+}
+
const getGameByID = `-- name: GetGameByID :one
SELECT game_id, game_type, state, display_name, duration_seconds, created_at, started_at, games.problem_id, problems.problem_id, title, description FROM games
LEFT JOIN problems ON games.problem_id = problems.problem_id
@@ -263,6 +312,36 @@ func (q *Queries) ListGamesForPlayer(ctx context.Context, userID int32) ([]ListG
return items, nil
}
+const listTestcasesByGameID = `-- name: ListTestcasesByGameID :many
+SELECT testcase_id, problem_id, stdin, stdout FROM testcases
+WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1)
+`
+
+func (q *Queries) ListTestcasesByGameID(ctx context.Context, gameID int32) ([]Testcase, error) {
+ rows, err := q.db.Query(ctx, listTestcasesByGameID, gameID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ var items []Testcase
+ for rows.Next() {
+ var i Testcase
+ if err := rows.Scan(
+ &i.TestcaseID,
+ &i.ProblemID,
+ &i.Stdin,
+ &i.Stdout,
+ ); err != nil {
+ return nil, err
+ }
+ items = append(items, i)
+ }
+ if err := rows.Err(); err != nil {
+ return nil, err
+ }
+ return items, nil
+}
+
const listUsers = `-- name: ListUsers :many
SELECT user_id, username, display_name, icon_path, is_admin, created_at FROM users
`
diff --git a/backend/main.go b/backend/main.go
index a213c6c..7cfbe2f 100644
--- a/backend/main.go
+++ b/backend/main.go
@@ -61,7 +61,7 @@ func main() {
e.Use(middleware.Recover())
taskQueue := taskqueue.NewQueue("task-db:6379")
- workerServer := taskqueue.NewWorkerServer("task-db:6379")
+ workerServer := taskqueue.NewWorkerServer("task-db:6379", queries)
gameHubs := game.NewGameHubs(queries, taskQueue)
err = gameHubs.RestoreFromDB(ctx)
diff --git a/backend/query.sql b/backend/query.sql
index 245d5cf..6395b9b 100644
--- a/backend/query.sql
+++ b/backend/query.sql
@@ -53,3 +53,16 @@ SET
started_at = $6,
problem_id = $7
WHERE game_id = $1;
+
+-- name: CreateSubmission :one
+INSERT INTO submissions (game_id, user_id, code, code_size)
+VALUES ($1, $2, $3, $4)
+RETURNING submission_id;
+
+-- name: ListTestcasesByGameID :many
+SELECT * FROM testcases
+WHERE testcases.problem_id = (SELECT problem_id FROM games WHERE game_id = $1);
+
+-- name: CreateTestcaseExecution :exec
+INSERT INTO testcase_executions (submission_id, testcase_id, status, stdout, stderr)
+VALUES ($1, $2, $3, $4, $5);
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index e26ac64..1105da5 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -1,25 +1,193 @@
package taskqueue
import (
+ "bytes"
"context"
"encoding/json"
"fmt"
+ "net/http"
+ "strings"
"github.com/hibiken/asynq"
+
+ "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db"
)
type ExecProcessor struct {
+ q *db.Queries
+}
+
+func NewExecProcessor(q *db.Queries) *ExecProcessor {
+ return &ExecProcessor{
+ q: q,
+ }
}
-func (processor *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
+func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var payload TaskExecPlayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
- // TODO
+
+ // TODO: upsert
+ // Create submission record.
+ submissionID, err := p.q.CreateSubmission(ctx, db.CreateSubmissionParams{
+ GameID: int32(payload.GameID),
+ UserID: int32(payload.UserID),
+ Code: payload.Code,
+ CodeSize: int32(len(payload.Code)), // TODO: exclude whitespaces.
+ })
+ if err != nil {
+ return fmt.Errorf("CreateSubmission failed: %v", err)
+ }
+
+ {
+ type swiftcRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ }
+ type swiftcResponseData struct {
+ Result string `json:"result"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := swiftcRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code,
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ return fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/swiftc", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ return fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := swiftcResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ return fmt.Errorf("json.Decode failed: %v", err)
+ }
+ if resData.Result != "success" {
+ err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
+ SubmissionID: submissionID,
+ TestcaseID: nil,
+ Status: "compile_error",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ })
+ if err != nil {
+ return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
+ }
+ return fmt.Errorf("swiftc failed: %v", resData.Stderr)
+ }
+ }
+ {
+ type wasmcRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ }
+ type wasmcResponseData struct {
+ Result string `json:"result"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := wasmcRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code,
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ return fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/wasmc", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ return fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := wasmcResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ return fmt.Errorf("json.Decode failed: %v", err)
+ }
+ if resData.Result != "success" {
+ err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
+ SubmissionID: submissionID,
+ TestcaseID: nil,
+ Status: "compile_error",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ })
+ if err != nil {
+ return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
+ }
+ return fmt.Errorf("wasmc failed: %v", resData.Stderr)
+ }
+ }
+
+ testcases, err := p.q.ListTestcasesByGameID(ctx, int32(payload.GameID))
+ if err != nil {
+ return fmt.Errorf("ListTestcasesByGameID failed: %v", err)
+ }
+
+ for _, testcase := range testcases {
+ type testrunRequestData struct {
+ MaxDuration int `json:"max_duration_ms"`
+ Code string `json:"code"`
+ Stdin string `json:"stdin"`
+ }
+ type testrunResponseData struct {
+ Result string `json:"result"`
+ Stdout string `json:"stdout"`
+ Stderr string `json:"stderr"`
+ }
+ reqData := testrunRequestData{
+ MaxDuration: 5000,
+ Code: payload.Code,
+ Stdin: testcase.Stdin,
+ }
+ reqJson, err := json.Marshal(reqData)
+ if err != nil {
+ return fmt.Errorf("json.Marshal failed: %v", err)
+ }
+ res, err := http.Post("http://worker:80/api/testrun", "application/json", bytes.NewBuffer(reqJson))
+ if err != nil {
+ return fmt.Errorf("http.Post failed: %v", err)
+ }
+ resData := testrunResponseData{}
+ if err := json.NewDecoder(res.Body).Decode(&resData); err != nil {
+ return fmt.Errorf("json.Decode failed: %v", err)
+ }
+ if resData.Result != "success" {
+ err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
+ SubmissionID: submissionID,
+ TestcaseID: &testcase.TestcaseID,
+ Status: resData.Result,
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ })
+ if err != nil {
+ return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
+ }
+ return fmt.Errorf("testrun failed: %v", resData.Stderr)
+ }
+ if isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) {
+ err := p.q.CreateTestcaseExecution(ctx, db.CreateTestcaseExecutionParams{
+ SubmissionID: submissionID,
+ TestcaseID: &testcase.TestcaseID,
+ Status: "wrong_answer",
+ Stdout: resData.Stdout,
+ Stderr: resData.Stderr,
+ })
+ if err != nil {
+ return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
+ }
+ return fmt.Errorf("testrun failed: %v", resData.Stdout)
+ }
+ }
+
return nil
}
-func NewExecProcessor() *ExecProcessor {
- return &ExecProcessor{}
+func isTestcaseExecutionCorrect(expectedStdout, actualStdout string) bool {
+ expectedStdout = strings.TrimSpace(expectedStdout)
+ actualStdout = strings.TrimSpace(actualStdout)
+ return actualStdout == expectedStdout
}
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 9bdd81f..09d9761 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -2,13 +2,16 @@ package taskqueue
import (
"github.com/hibiken/asynq"
+
+ "github.com/nsfisis/iosdc-japan-2024-albatross/backend/db"
)
type WorkerServer struct {
- server *asynq.Server
+ server *asynq.Server
+ queries *db.Queries
}
-func NewWorkerServer(redisAddr string) *WorkerServer {
+func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
return &WorkerServer{
server: asynq.NewServer(
asynq.RedisClientOpt{
@@ -21,7 +24,7 @@ func NewWorkerServer(redisAddr string) *WorkerServer {
func (s *WorkerServer) Run() error {
mux := asynq.NewServeMux()
- mux.Handle(TaskTypeExec, NewExecProcessor())
+ mux.Handle(TaskTypeExec, NewExecProcessor(s.queries))
return s.server.Run(mux)
}