diff options
Diffstat (limited to 'backend/cmd_serve.go')
| -rw-r--r-- | backend/cmd_serve.go | 215 |
1 files changed, 215 insertions, 0 deletions
diff --git a/backend/cmd_serve.go b/backend/cmd_serve.go new file mode 100644 index 0000000..66ceaf9 --- /dev/null +++ b/backend/cmd_serve.go @@ -0,0 +1,215 @@ +package main + +import ( + "context" + "database/sql" + "embed" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/99designs/gqlgen/graphql/handler" + "github.com/99designs/gqlgen/graphql/handler/extension" + "github.com/99designs/gqlgen/graphql/handler/lru" + "github.com/99designs/gqlgen/graphql/handler/transport" + "github.com/hashicorp/go-multierror" + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + "github.com/mmcdole/gofeed" + "github.com/vektah/gqlparser/v2/ast" + + "undef.ninja/x/feedaka/db" + "undef.ninja/x/feedaka/graphql" + "undef.ninja/x/feedaka/graphql/resolver" +) + +var ( + //go:embed public/* + publicFS embed.FS +) + +func fetchOneFeed(feedID int64, url string, ctx context.Context, queries *db.Queries) error { + log.Printf("Fetching %s...\n", url) + fp := gofeed.NewParser() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + feed, err := fp.ParseURLWithContext(url, ctx) + if err != nil { + return fmt.Errorf("Failed to fetch %s: %v\n", url, err) + } + err = queries.UpdateFeedMetadata(ctx, db.UpdateFeedMetadataParams{ + Title: feed.Title, + FetchedAt: time.Now().UTC().Format(time.RFC3339), + ID: feedID, + }) + if err != nil { + return err + } + guids, err := queries.GetArticleGUIDsByFeed(ctx, feedID) + if err != nil { + return err + } + existingArticleGUIDs := make(map[string]bool) + for _, guid := range guids { + existingArticleGUIDs[guid] = true + } + for _, item := range feed.Items { + if existingArticleGUIDs[item.GUID] { + err := queries.UpdateArticle(ctx, db.UpdateArticleParams{ + Title: item.Title, + Url: item.Link, + FeedID: feedID, + Guid: item.GUID, + }) + if err != nil { + return err + } + } else { + _, err := queries.CreateArticle(ctx, db.CreateArticleParams{ + FeedID: feedID, + Guid: item.GUID, + Title: item.Title, + Url: item.Link, + IsRead: 0, + }) + if err != nil { + return err + } + } + } + return nil +} + +func listFeedsToBeFetched(ctx context.Context, queries *db.Queries) (map[int64]string, error) { + feeds, err := queries.GetFeedsToFetch(ctx) + if err != nil { + return nil, err + } + + result := make(map[int64]string) + for _, feed := range feeds { + fetchedAtTime, err := time.Parse(time.RFC3339, feed.FetchedAt) + if err != nil { + log.Fatal(err) + } + now := time.Now().UTC() + if now.Sub(fetchedAtTime).Minutes() <= 10 { + continue + } + result[feed.ID] = feed.Url + } + return result, nil +} + +func fetchAllFeeds(ctx context.Context, queries *db.Queries) error { + feeds, err := listFeedsToBeFetched(ctx, queries) + if err != nil { + return err + } + + var result *multierror.Error + for feedID, url := range feeds { + err := fetchOneFeed(feedID, url, ctx, queries) + if err != nil { + result = multierror.Append(result, err) + } + time.Sleep(5 * time.Second) + } + return result.ErrorOrNil() +} + +func scheduled(ctx context.Context, d time.Duration, fn func()) { + ticker := time.NewTicker(d) + go func() { + for { + select { + case <-ticker.C: + fn() + case <-ctx.Done(): + return + } + } + }() +} + +func runServe(database *sql.DB) { + port := os.Getenv("FEEDAKA_PORT") + if port == "" { + port = "8080" + } + + err := db.ValidateSchemaVersion(database) + if err != nil { + log.Fatal(err) + } + + queries := db.New(database) + + e := echo.New() + + e.Use(middleware.Logger()) + e.Use(middleware.Recover()) + e.Use(middleware.CORS()) + + e.Use(middleware.StaticWithConfig(middleware.StaticConfig{ + HTML5: true, + Root: "public", + Filesystem: http.FS(publicFS), + })) + + // Setup GraphQL server + srv := handler.New(graphql.NewExecutableSchema(graphql.Config{Resolvers: &resolver.Resolver{DB: database, Queries: queries}})) + + srv.AddTransport(transport.Options{}) + srv.AddTransport(transport.GET{}) + srv.AddTransport(transport.POST{}) + + srv.SetQueryCache(lru.New[*ast.QueryDocument](1000)) + + srv.Use(extension.Introspection{}) + srv.Use(extension.AutomaticPersistedQuery{ + Cache: lru.New[string](100), + }) + + // GraphQL endpoints + e.POST("/graphql", echo.WrapHandler(srv)) + e.GET("/graphql", echo.WrapHandler(srv)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + scheduled(ctx, 1*time.Hour, func() { + err := fetchAllFeeds(ctx, queries) + if err != nil { + log.Printf("Failed to fetch feeds: %v\n", err) + } + }) + + // Setup graceful shutdown + go func() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + log.Println("Shutting down server...") + cancel() + + // Give time for graceful shutdown + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer shutdownCancel() + + if err := e.Shutdown(shutdownCtx); err != nil { + log.Printf("Error during shutdown: %v\n", err) + } + }() + + log.Printf("Server starting on port %s...\n", port) + err = e.Start(":" + port) + if err != nil && err != http.ErrServerClosed { + log.Printf("Server error: %v\n", err) + } + log.Println("Server stopped") +} |
