aboutsummaryrefslogtreecommitdiffhomepage
path: root/backend
diff options
context:
space:
mode:
authornsfisis <nsfisis@gmail.com>2024-08-07 22:14:51 +0900
committernsfisis <nsfisis@gmail.com>2024-08-08 04:02:37 +0900
commit6bb6071ec1dce0cce59df0cb1c38168918061041 (patch)
treefeca06471f51ac5a1cdaaed4a0333ee9c0cf38e3 /backend
parent401a28944fc0408811aedadd1c3104e2e2d4d7fe (diff)
downloadiosdc-japan-2025-albatross-6bb6071ec1dce0cce59df0cb1c38168918061041.tar.gz
iosdc-japan-2025-albatross-6bb6071ec1dce0cce59df0cb1c38168918061041.tar.zst
iosdc-japan-2025-albatross-6bb6071ec1dce0cce59df0cb1c38168918061041.zip
refactor(backend): move ownership of channel to send task results
Diffstat (limited to 'backend')
-rw-r--r--backend/game/hub.go38
-rw-r--r--backend/main.go6
-rw-r--r--backend/taskqueue/processor.go35
-rw-r--r--backend/taskqueue/tasks.go7
-rw-r--r--backend/taskqueue/worker_server.go12
5 files changed, 65 insertions, 33 deletions
diff --git a/backend/game/hub.go b/backend/game/hub.go
index 719b216..8afc802 100644
--- a/backend/game/hub.go
+++ b/backend/game/hub.go
@@ -33,7 +33,7 @@ type gameHub struct {
watchers map[*watcherClient]bool
registerWatcher chan *watcherClient
unregisterWatcher chan *watcherClient
- testcaseExecution chan string
+ testcaseExecution chan taskqueue.TaskExecResult
}
func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskqueue.Queue) *gameHub {
@@ -49,7 +49,7 @@ func newGameHub(ctx context.Context, game *game, q *db.Queries, taskQueue *taskq
watchers: make(map[*watcherClient]bool),
registerWatcher: make(chan *watcherClient),
unregisterWatcher: make(chan *watcherClient),
- testcaseExecution: make(chan string),
+ testcaseExecution: make(chan taskqueue.TaskExecResult),
}
}
@@ -169,14 +169,13 @@ func (hub *gameHub) run() {
default:
log.Printf("unexpected message type: %T", message.message)
}
- case executionStatus := <-hub.testcaseExecution:
- _ = executionStatus
+ case executionResult := <-hub.testcaseExecution:
for player := range hub.players {
player.s2cMessages <- &playerMessageS2CExecResult{
Type: playerMessageTypeS2CExecResult,
Data: playerMessageS2CExecResultPayload{
Score: nil,
- Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionStatus),
+ Status: api.GamePlayerMessageS2CExecResultPayloadStatus(executionResult.Result),
},
}
}
@@ -261,16 +260,18 @@ func (hub *gameHub) closeWatcherClient(watcher *watcherClient) {
}
type GameHubs struct {
- hubs map[int]*gameHub
- q *db.Queries
- taskQueue *taskqueue.Queue
+ hubs map[int]*gameHub
+ q *db.Queries
+ taskQueue *taskqueue.Queue
+ taskResults chan taskqueue.TaskExecResult
}
-func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue) *GameHubs {
+func NewGameHubs(q *db.Queries, taskQueue *taskqueue.Queue, taskResults chan taskqueue.TaskExecResult) *GameHubs {
return &GameHubs{
- hubs: make(map[int]*gameHub),
- q: q,
- taskQueue: taskQueue,
+ hubs: make(map[int]*gameHub),
+ q: q,
+ taskQueue: taskQueue,
+ taskResults: taskResults,
}
}
@@ -328,6 +329,15 @@ func (hubs *GameHubs) Run() {
for _, hub := range hubs.hubs {
go hub.run()
}
+
+ for taskResult := range hubs.taskResults {
+ hub := hubs.getHub(taskResult.Task.GameID)
+ if hub == nil {
+ log.Printf("no such game: %d", taskResult.Task.GameID)
+ continue
+ }
+ hub.testcaseExecution <- taskResult
+ }
}
func (hubs *GameHubs) SockHandler() *sockHandler {
@@ -341,7 +351,3 @@ func (hubs *GameHubs) StartGame(gameID int) error {
}
return hub.startGame()
}
-
-func (hubs *GameHubs) C() chan string {
- return hubs.hubs[7].testcaseExecution
-}
diff --git a/backend/main.go b/backend/main.go
index e3d0052..62499d6 100644
--- a/backend/main.go
+++ b/backend/main.go
@@ -61,8 +61,9 @@ func main() {
e.Use(middleware.Recover())
taskQueue := taskqueue.NewQueue("task-db:6379")
+ workerServer := taskqueue.NewWorkerServer("task-db:6379", queries)
- gameHubs := game.NewGameHubs(queries, taskQueue)
+ gameHubs := game.NewGameHubs(queries, taskQueue, workerServer.Results())
err = gameHubs.RestoreFromDB(ctx)
if err != nil {
log.Fatalf("Error restoring game hubs from db %v", err)
@@ -95,9 +96,8 @@ func main() {
return c.Redirect(http.StatusPermanentRedirect, "http://localhost:5173/logout")
})
- gameHubs.Run()
+ go gameHubs.Run()
- workerServer := taskqueue.NewWorkerServer("task-db:6379", queries, gameHubs.C())
go func() {
workerServer.Run()
}()
diff --git a/backend/taskqueue/processor.go b/backend/taskqueue/processor.go
index 1d4c412..ca180e9 100644
--- a/backend/taskqueue/processor.go
+++ b/backend/taskqueue/processor.go
@@ -14,14 +14,14 @@ import (
)
type ExecProcessor struct {
- q *db.Queries
- c chan string
+ q *db.Queries
+ results chan TaskExecResult
}
-func NewExecProcessor(q *db.Queries, c chan string) *ExecProcessor {
+func NewExecProcessor(q *db.Queries, results chan TaskExecResult) *ExecProcessor {
return &ExecProcessor{
- q: q,
- c: c,
+ q: q,
+ results: results,
}
}
@@ -80,7 +80,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- "compile_error"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "compile_error",
+ }
return fmt.Errorf("swiftc failed: %v", resData.Stderr)
}
}
@@ -121,7 +124,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- "compile_error"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "compile_error",
+ }
return fmt.Errorf("wasmc failed: %v", resData.Stderr)
}
}
@@ -170,7 +176,10 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- resData.Result
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: resData.Result,
+ }
return fmt.Errorf("testrun failed: %v", resData.Stderr)
}
if !isTestcaseExecutionCorrect(testcase.Stdout, resData.Stdout) {
@@ -184,12 +193,18 @@ func (p *ExecProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil {
return fmt.Errorf("CreateTestcaseExecution failed: %v", err)
}
- p.c <- "wrong_answer"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "wrong_answer",
+ }
return fmt.Errorf("testrun failed: %v", resData.Stdout)
}
}
- p.c <- "success"
+ p.results <- TaskExecResult{
+ Task: &payload,
+ Result: "success",
+ }
return nil
}
diff --git a/backend/taskqueue/tasks.go b/backend/taskqueue/tasks.go
index cd67948..db05553 100644
--- a/backend/taskqueue/tasks.go
+++ b/backend/taskqueue/tasks.go
@@ -27,3 +27,10 @@ func NewExecTask(gameID, userID int, code string) (*asynq.Task, error) {
}
return asynq.NewTask(TaskTypeExec, payload), nil
}
+
+type TaskExecResult struct {
+ Task *TaskExecPlayload
+ Result string
+ Stdout string
+ Stderr string
+}
diff --git a/backend/taskqueue/worker_server.go b/backend/taskqueue/worker_server.go
index 485d6d3..67803e2 100644
--- a/backend/taskqueue/worker_server.go
+++ b/backend/taskqueue/worker_server.go
@@ -9,10 +9,10 @@ import (
type WorkerServer struct {
server *asynq.Server
queries *db.Queries
- c chan string
+ results chan TaskExecResult
}
-func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *WorkerServer {
+func NewWorkerServer(redisAddr string, queries *db.Queries) *WorkerServer {
return &WorkerServer{
server: asynq.NewServer(
asynq.RedisClientOpt{
@@ -21,13 +21,17 @@ func NewWorkerServer(redisAddr string, queries *db.Queries, c chan string) *Work
asynq.Config{},
),
queries: queries,
- c: c,
+ results: make(chan TaskExecResult),
}
}
func (s *WorkerServer) Run() error {
mux := asynq.NewServeMux()
- mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.c))
+ mux.Handle(TaskTypeExec, NewExecProcessor(s.queries, s.results))
return s.server.Run(mux)
}
+
+func (s *WorkerServer) Results() chan TaskExecResult {
+ return s.results
+}