From 791b109d6f5a4e35c1fdd158d082e86225306db3 Mon Sep 17 00:00:00 2001
From: zeripath <art27@cantab.net>
Date: Sun, 16 Feb 2020 00:29:43 +0000
Subject: [PATCH] Notification: queue ui.go notification-service (#10281)

Co-authored-by: Lauris BH <lauris@nix.lv>
---
 modules/notification/ui/ui.go | 43 ++++++++++++++++++++++-------------
 1 file changed, 27 insertions(+), 16 deletions(-)

diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go
index a8c904e22c..4d80d43c91 100644
--- a/modules/notification/ui/ui.go
+++ b/modules/notification/ui/ui.go
@@ -6,14 +6,16 @@ package ui
 
 import (
 	"code.gitea.io/gitea/models"
+	"code.gitea.io/gitea/modules/graceful"
 	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/notification/base"
+	"code.gitea.io/gitea/modules/queue"
 )
 
 type (
 	notificationService struct {
 		base.NullNotifier
-		issueQueue chan issueNotificationOpts
+		issueQueue queue.Queue
 	}
 
 	issueNotificationOpts struct {
@@ -29,19 +31,24 @@ var (
 
 // NewNotifier create a new notificationService notifier
 func NewNotifier() base.Notifier {
-	return &notificationService{
-		issueQueue: make(chan issueNotificationOpts, 100),
-	}
+	ns := &notificationService{}
+	ns.issueQueue = queue.CreateQueue("notification-service", ns.handle, issueNotificationOpts{})
+	return ns
 }
 
-func (ns *notificationService) Run() {
-	for opts := range ns.issueQueue {
+func (ns *notificationService) handle(data ...queue.Data) {
+	for _, datum := range data {
+		opts := datum.(issueNotificationOpts)
 		if err := models.CreateOrUpdateIssueNotifications(opts.issueID, opts.commentID, opts.notificationAuthorID); err != nil {
 			log.Error("Was unable to create issue notification: %v", err)
 		}
 	}
 }
 
+func (ns *notificationService) Run() {
+	graceful.GetManager().RunWithShutdownFns(ns.issueQueue.Run)
+}
+
 func (ns *notificationService) NotifyCreateIssueComment(doer *models.User, repo *models.Repository,
 	issue *models.Issue, comment *models.Comment) {
 	var opts = issueNotificationOpts{
@@ -51,35 +58,39 @@ func (ns *notificationService) NotifyCreateIssueComment(doer *models.User, repo
 	if comment != nil {
 		opts.commentID = comment.ID
 	}
-	ns.issueQueue <- opts
+	_ = ns.issueQueue.Push(opts)
 }
 
 func (ns *notificationService) NotifyNewIssue(issue *models.Issue) {
-	ns.issueQueue <- issueNotificationOpts{
+	_ = ns.issueQueue.Push(issueNotificationOpts{
 		issueID:              issue.ID,
 		notificationAuthorID: issue.Poster.ID,
-	}
+	})
 }
 
 func (ns *notificationService) NotifyIssueChangeStatus(doer *models.User, issue *models.Issue, actionComment *models.Comment, isClosed bool) {
-	ns.issueQueue <- issueNotificationOpts{
+	_ = ns.issueQueue.Push(issueNotificationOpts{
 		issueID:              issue.ID,
 		notificationAuthorID: doer.ID,
-	}
+	})
 }
 
 func (ns *notificationService) NotifyMergePullRequest(pr *models.PullRequest, doer *models.User) {
-	ns.issueQueue <- issueNotificationOpts{
+	_ = ns.issueQueue.Push(issueNotificationOpts{
 		issueID:              pr.Issue.ID,
 		notificationAuthorID: doer.ID,
-	}
+	})
 }
 
 func (ns *notificationService) NotifyNewPullRequest(pr *models.PullRequest) {
-	ns.issueQueue <- issueNotificationOpts{
+	if err := pr.LoadIssue(); err != nil {
+		log.Error("Unable to load issue: %d for pr: %d: Error: %v", pr.IssueID, pr.ID, err)
+		return
+	}
+	_ = ns.issueQueue.Push(issueNotificationOpts{
 		issueID:              pr.Issue.ID,
 		notificationAuthorID: pr.Issue.PosterID,
-	}
+	})
 }
 
 func (ns *notificationService) NotifyPullRequestReview(pr *models.PullRequest, r *models.Review, c *models.Comment) {
@@ -90,5 +101,5 @@ func (ns *notificationService) NotifyPullRequestReview(pr *models.PullRequest, r
 	if c != nil {
 		opts.commentID = c.ID
 	}
-	ns.issueQueue <- opts
+	_ = ns.issueQueue.Push(opts)
 }