diff options
| author | nsfisis <nsfisis@gmail.com> | 2024-08-07 22:14:51 +0900 |
|---|---|---|
| committer | nsfisis <nsfisis@gmail.com> | 2024-08-08 04:02:37 +0900 |
| commit | 6bb6071ec1dce0cce59df0cb1c38168918061041 (patch) | |
| tree | feca06471f51ac5a1cdaaed4a0333ee9c0cf38e3 /backend | |
| parent | 401a28944fc0408811aedadd1c3104e2e2d4d7fe (diff) | |
| download | iosdc-japan-2025-albatross-6bb6071ec1dce0cce59df0cb1c38168918061041.tar.gz iosdc-japan-2025-albatross-6bb6071ec1dce0cce59df0cb1c38168918061041.tar.zst iosdc-japan-2025-albatross-6bb6071ec1dce0cce59df0cb1c38168918061041.zip | |
refactor(backend): move ownership of channel to send task results
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/game/hub.go | 38 | ||||
| -rw-r--r-- | backend/main.go | 6 | ||||
| -rw-r--r-- | backend/taskqueue/processor.go | 35 | ||||
| -rw-r--r-- | backend/taskqueue/tasks.go | 7 | ||||
| -rw-r--r-- | backend/taskqueue/worker_server.go | 12 |
5 files changed, 65 insertions, 33 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go index 719b216..8afc802 100644 --- a/backend/game/hub.go +++ b/backend/game/hub.go @@ -33,7 +33,7 @@ type gameHub struct { watchers map[*watcherClient]bool registerWatcher chan *watcherClient unregisterWatcher chan *watcherClient - testcaseExecution chan string + testcaseExecution chan taskqueue.TaskExecResult } func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub { @@ -49,7 +49,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq watchers: make(map[*watcherClient]bool), registerWatcher: make(chan *watcherClient), unregisterWatcher: make(chan *watcherClient), - testcaseExecution: make(chan string), + testcaseExecution: make(chan taskqueue.TaskExecResult), } } @@ -169,14 +169,13 @@ func (hub *gameHub) run() { default: log.Printf("unexpected message type: %T", message.message) } - case executionStatus := <-hub.testcaseExecution: - _ = executionStatus + case executionResult := <-hub.testcaseExecution: for player := range hub.players { player.s2cMessages <- &playerMessageS2CExecResult{ Type: playerMessageTypeS2CExecResult, Data: playerMessageS2CExecResultPayload{ Score: nil, - Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionStatus), + Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionResult.Result), }, } } @@ -261,16 +260,18 @@ func (hub *gameHub) closeWatcherClient(watcher *watcherClient) { } type GameHubs struct { - hubs map[int]*gameHub - q *db.Queries - taskQueue *taskqueue.Queue + hubs map[int]*gameHub + q *db.Queries + taskQueue *taskqueue.Queue + taskResults chan taskqueue.TaskExecResult } -func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue) *GameHubs { +func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskExecResult) *GameHubs { return &GameHubs{ - hubs: make(map[int]*gameHub), - q: q, - taskQueue: taskQueue, + hubs: make(map[int]*gameHub), + q: q, + taskQueue: taskQueue, + taskResults: taskResults, } } @@ -328,6 +329,15 @@ func (hubs *GameHubs) Run() { for _, hub := range hubs.hubs { go hub.run() } + + for taskResult := range hubs.taskResults { + hub := hubs.getHub(taskResult.Task.GameID) + if hub == nil { + log.Printf("no such game: %d", taskResult.Task.GameID) + continue + } + hub.testcaseExecution <- taskResult + } } func (hubs *GameHubs) SockHandler() *sockHandler { @@ -341,7 +351,3 @@ func (hubs *GameHubs) StartGame(gameID int) error { } return hub.startGame() } - -func (hubs *GameHubs) C() chan string { - return hubs.hubs[7].testcaseExecution -} diff --git a/backend/main.go b/backend/main.go index e3d0052..62499d6 100644 --- a/backend/main.go +++ b/backend/main.go @@ -61,8 +61,9 @@ func main() { e.Use(middleware.Recover()) taskQueue := taskqueue.NewQueue("task-db:6379") + workerServer := taskqueue.NewWorkerServer("task-db:6379", queries) - gameHubs := game.NewGameHubs(queries, taskQueue) + gameHubs := game.NewGameHubs(queries, taskQueue, workerServer.Results()) err = gameHubs.RestoreFromDB(ctx) if err != nil { log.Fatalf("Error restoring game hubs from db %v", err) @@ -95,9 +96,8 @@ func main() { return c.Redirect(http.StatusPermanentRedirect, "http://localhost:5173/logout") }) - gameHubs.Run() + go gameHubs.Run() - workerServer := taskqueue.NewWorkerServer("task-db:6379", queries, gameHubs.C()) go func() { workerServer.Run() }() diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go index 1d4c412..ca180e9 100644 --- a/backend/taskqueue/processor.go +++ b/backend/taskqueue/processor.go @@ -14,14 +14,14 @@ import ( ) type ExecProcessor struct { - q *db.Queries - c chan string + q *db.Queries + results chan TaskExecResult } -func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor { +func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor { return &ExecProcessor{ - q: q, - c: c, + q: q, + results: results, } } @@ -80,7 +80,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- "compile_error" + p.results <- TaskExecResult{ + Task: &payload, + Result: "compile_error", + } return fmt.Errorf("swiftc failed: %v", resData.Stderr) } } @@ -121,7 +124,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- "compile_error" + p.results <- TaskExecResult{ + Task: &payload, + Result: "compile_error", + } return fmt.Errorf("wasmc failed: %v", resData.Stderr) } } @@ -170,7 +176,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- resData.Result + p.results <- TaskExecResult{ + Task: &payload, + Result: resData.Result, + } return fmt.Errorf("testrun failed: %v", resData.Stderr) } if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) { @@ -184,12 +193,18 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { if err != nil { return fmt.Errorf("CreateTestcaseExecution failed: %v", err) } - p.c <- "wrong_answer" + p.results <- TaskExecResult{ + Task: &payload, + Result: "wrong_answer", + } return fmt.Errorf("testrun failed: %v", resData.Stdout) } } - p.c <- "success" + p.results <- TaskExecResult{ + Task: &payload, + Result: "success", + } return nil } diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go index cd67948..db05553 100644 --- a/backend/taskqueue/tasks.go +++ b/backend/taskqueue/tasks.go @@ -27,3 +27,10 @@ func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) { } return asynq.NewTask(TaskTypeExec, payload), nil } + +type TaskExecResult struct { + Task *TaskExecPlayload + Result string + Stdout string + Stderr string +} diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go index 485d6d3..67803e2 100644 --- a/backend/taskqueue/worker_server.go +++ b/backend/taskqueue/worker_server.go @@ -9,10 +9,10 @@ import ( type WorkerServer struct { server *asynq.Server queries *db.Queries - c chan string + results chan TaskExecResult } -func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *WorkerServer { +func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer { return &WorkerServer{ server: asynq.NewServer( asynq.RedisClientOpt{ @@ -21,13 +21,17 @@ func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *Work asynq.Config{}, ), queries: queries, - c: c, + results: make(chan TaskExecResult), } } func (s *WorkerServer) Run() error { mux := asynq.NewServeMux() - mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.c)) + mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.results)) return s.server.Run(mux) } + +func (s *WorkerServer) Results() chan TaskExecResult { + return s.results +} |
