From 2f786bf424b20261bc57ef15edb27720bc670963 Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 17 May 2011 00:04:49 -0700 Subject: [PATCH] JobProvider now fully works based on a queuing logic, which allows more than one job to be queued. (EasyButton included!) --- NzbDrone.Core.Test/JobProviderTest.cs | 91 +++++++++++++------ NzbDrone.Core.Test/MockLib.cs | 2 +- .../Providers/Jobs/DeleteSeriesJob.cs | 6 +- NzbDrone.Core/Providers/Jobs/JobProvider.cs | 87 ++++++++++++++++-- .../Providers/Jobs/MediaFileScanJob.cs | 4 +- .../Providers/Jobs/NewSeriesUpdate.cs | 12 ++- .../Controllers/AddSeriesController.cs | 2 +- NzbDrone.Web/Controllers/SeriesController.cs | 6 +- NzbDrone.sln | 1 + 9 files changed, 160 insertions(+), 51 deletions(-) diff --git a/NzbDrone.Core.Test/JobProviderTest.cs b/NzbDrone.Core.Test/JobProviderTest.cs index 6c13ed0c8..5dd4041fe 100644 --- a/NzbDrone.Core.Test/JobProviderTest.cs +++ b/NzbDrone.Core.Test/JobProviderTest.cs @@ -15,11 +15,11 @@ public class JobProviderTest [Test] public void Run_Jobs_Updates_Last_Execution() { - IEnumerable fakeTimers = new List { new FakeJob() }; + IEnumerable fakeJobs = new List { new FakeJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); //Act var timerProvider = mocker.Resolve(); @@ -37,11 +37,11 @@ public void Run_Jobs_Updates_Last_Execution() public void Run_Jobs_Updates_Last_Execution_Mark_as_unsuccesful() { - IEnumerable fakeTimers = new List { new BrokenJob() }; + IEnumerable fakeJobs = new List { new BrokenJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); //Act var timerProvider = mocker.Resolve(); @@ -61,11 +61,11 @@ public void Run_Jobs_Updates_Last_Execution_Mark_as_unsuccesful() //after execution so the job can successfully run. public void can_run_job_again() { - IEnumerable fakeTimers = new List { new FakeJob() }; + IEnumerable fakeJobs = new List { new FakeJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); @@ -82,11 +82,11 @@ public void can_run_job_again() //after execution so the job can successfully run. public void can_run_broken_job_again() { - IEnumerable fakeTimers = new List { new BrokenJob() }; + IEnumerable fakeJobs = new List { new BrokenJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); @@ -103,17 +103,17 @@ public void can_run_broken_job_again() //after execution so the job can successfully run. public void can_run_async_job_again() { - IEnumerable fakeTimers = new List { new FakeJob() }; + IEnumerable fakeJobs = new List { new FakeJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); - var firstRun = timerProvider.BeginExecute(typeof(FakeJob)); + var firstRun = timerProvider.QueueJob(typeof(FakeJob)); Thread.Sleep(2000); - var secondRun = timerProvider.BeginExecute(typeof(FakeJob)); + var secondRun = timerProvider.QueueJob(typeof(FakeJob)); Assert.IsTrue(firstRun); Assert.IsTrue(secondRun); @@ -125,17 +125,17 @@ public void can_run_async_job_again() //after execution so the job can successfully run. public void can_run_broken_async_job_again() { - IEnumerable fakeTimers = new List { new BrokenJob() }; + IEnumerable fakeJobs = new List { new BrokenJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); - var firstRun = timerProvider.BeginExecute(typeof(FakeJob)); + var firstRun = timerProvider.QueueJob(typeof(FakeJob)); Thread.Sleep(2000); - var secondRun = timerProvider.BeginExecute(typeof(FakeJob)); + var secondRun = timerProvider.QueueJob(typeof(FakeJob)); Assert.IsTrue(firstRun); Assert.IsTrue(secondRun); @@ -146,11 +146,11 @@ public void can_run_broken_async_job_again() //after execution so the job can successfully run. public void can_run_two_jobs_at_the_same_time() { - IEnumerable fakeTimers = new List { new SlowJob() }; + IEnumerable fakeJobs = new List { new SlowJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); @@ -172,15 +172,47 @@ public void can_run_two_jobs_at_the_same_time() } + + [Test] + //This test will confirm that the concurrency checks are rest + //after execution so the job can successfully run. + public void can_queue_jobs_at_the_same_time() + { + var slowJob = new SlowJob(); + + IEnumerable fakeJobs = new List { slowJob }; + var mocker = new AutoMoqer(); + + mocker.SetConstant(MockLib.GetEmptyRepository()); + mocker.SetConstant(fakeJobs); + + var timerProvider = mocker.Resolve(); + timerProvider.Initialize(); + + var thread1 = new Thread(() => timerProvider.QueueJob(typeof(SlowJob))); + var thread2 = new Thread(() => timerProvider.QueueJob(typeof(SlowJob))); + + thread1.Start(); + thread2.Start(); + + thread1.Join(); + thread2.Join(); + + Thread.Sleep(5000); + + Assert.AreEqual(1, slowJob.ExexutionCount); + + } + [Test] public void Init_Jobs() { var fakeTimer = new FakeJob(); - IEnumerable fakeTimers = new List { fakeTimer }; + IEnumerable fakeJobs = new List { fakeTimer }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); @@ -207,11 +239,11 @@ public void Init_Timers_only_registers_once() for (int i = 0; i < 2; i++) { var fakeTimer = new FakeJob(); - IEnumerable fakeTimers = new List { fakeTimer }; + IEnumerable fakeJobs = new List { fakeTimer }; var mocker = new AutoMoqer(); mocker.SetConstant(repo); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); @@ -238,11 +270,11 @@ public void Init_Timers_sets_interval_0_to_disabled() for (int i = 0; i < 2; i++) { var disabledJob = new DisabledJob(); - IEnumerable fakeTimers = new List { disabledJob }; + IEnumerable fakeJobs = new List { disabledJob }; var mocker = new AutoMoqer(); mocker.SetConstant(repo); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); var timerProvider = mocker.Resolve(); timerProvider.Initialize(); @@ -264,11 +296,11 @@ public void Init_Timers_sets_interval_0_to_disabled() [Test] public void Get_Next_Execution_Time() { - IEnumerable fakeTimers = new List { new FakeJob() }; + IEnumerable fakeJobs = new List { new FakeJob() }; var mocker = new AutoMoqer(); mocker.SetConstant(MockLib.GetEmptyRepository()); - mocker.SetConstant(fakeTimers); + mocker.SetConstant(fakeJobs); //Act var timerProvider = mocker.Resolve(); @@ -333,7 +365,7 @@ public int DefaultInterval public void Start(ProgressNotification notification, int targetId) { - throw new InvalidOperationException(); + throw new ApplicationException("Broken job is broken"); } } @@ -349,9 +381,12 @@ public int DefaultInterval get { return 15; } } + public int ExexutionCount { get; set; } + public void Start(ProgressNotification notification, int targetId) { - Thread.Sleep(5000); + Thread.Sleep(2000); + ExexutionCount++; } } } \ No newline at end of file diff --git a/NzbDrone.Core.Test/MockLib.cs b/NzbDrone.Core.Test/MockLib.cs index 26925d384..2945d2580 100644 --- a/NzbDrone.Core.Test/MockLib.cs +++ b/NzbDrone.Core.Test/MockLib.cs @@ -35,7 +35,7 @@ public static ConfigProvider StandardConfig public static IRepository GetEmptyRepository() { - return GetEmptyRepository(true); + return GetEmptyRepository(false); } public static IRepository GetEmptyRepository(bool enableLogging) diff --git a/NzbDrone.Core/Providers/Jobs/DeleteSeriesJob.cs b/NzbDrone.Core/Providers/Jobs/DeleteSeriesJob.cs index 073736f89..1f3d63ca1 100644 --- a/NzbDrone.Core/Providers/Jobs/DeleteSeriesJob.cs +++ b/NzbDrone.Core/Providers/Jobs/DeleteSeriesJob.cs @@ -40,11 +40,13 @@ private void DeleteSeries(ProgressNotification notification, int seriesId) try { - notification.CurrentMessage = String.Format("Beginning Delete of Series: {0}", seriesId); + var title = _seriesProvider.GetSeries(seriesId).Title; + + notification.CurrentMessage = String.Format("Deleting '{0}' from database", title); _seriesProvider.DeleteSeries(seriesId); - notification.CurrentMessage = String.Format("Successfully deleted Series: {0}", seriesId); + notification.CurrentMessage = String.Format("Successfully deleted '{0}'", title); } catch (Exception e) { diff --git a/NzbDrone.Core/Providers/Jobs/JobProvider.cs b/NzbDrone.Core/Providers/Jobs/JobProvider.cs index 9f962ef43..d252aa270 100644 --- a/NzbDrone.Core/Providers/Jobs/JobProvider.cs +++ b/NzbDrone.Core/Providers/Jobs/JobProvider.cs @@ -20,6 +20,7 @@ public class JobProvider private static readonly object ExecutionLock = new object(); private Thread _jobThread; private static bool _isRunning; + private static readonly List> Queue = new List>(); private ProgressNotification _notification; @@ -86,7 +87,7 @@ public virtual bool RunScheduled() foreach (var pendingTimer in pendingJobs) { var timerClass = _jobs.Where(t => t.GetType().ToString() == pendingTimer.TypeName).FirstOrDefault(); - Execute(timerClass.GetType(), 0); + Execute(timerClass.GetType()); } } finally @@ -101,29 +102,48 @@ public virtual bool RunScheduled() /// Starts the execution of a job asynchronously /// /// Type of the job that should be executed. + /// If the job is allowed to be queued in case another task is aready running. /// The targetId could be any Id parameter eg. SeriesId. it will be passed to the job implementation /// to allow it to filter it's target of execution. /// True if ran, false if skipped - public virtual bool BeginExecute(Type jobType, int targetId = 0) + /// Job is only added to the queue if same job with the same targetId doesn't already exist in the queue. + public virtual bool QueueJob(Type jobType, int targetId = 0) { + Logger.Debug("Adding job {0} ->{1} to the queue", jobType, targetId); + lock (Queue) + { + var queueTuple = new Tuple(jobType, targetId); + + if (Queue.Contains(queueTuple)) + { + Logger.Info("Job {0} ->{1} already exists in queue. Skipping.", jobType, targetId); + return false; + } + + Queue.Add(queueTuple); + Logger.Debug("Job {0} ->{1} added to the queue", jobType, targetId); + + } + lock (ExecutionLock) { if (_isRunning) { - Logger.Info("Another job is already running. Ignoring request."); - return false; + Logger.Trace("Queue is already running. Ignoreing request."); + return true; } - _isRunning = true; + } + if (_jobThread == null || !_jobThread.IsAlive) { - Logger.Trace("Initializing background thread"); + Logger.Trace("Initializing queue processor thread"); ThreadStart starter = () => { try { - Execute(jobType, targetId); + ProcessQueue(); } finally { @@ -131,7 +151,7 @@ public virtual bool BeginExecute(Type jobType, int targetId = 0) } }; - _jobThread = new Thread(starter) { Name = "TimerThread", Priority = ThreadPriority.BelowNormal }; + _jobThread = new Thread(starter) { Name = "JobQueueThread", Priority = ThreadPriority.BelowNormal }; _jobThread.Start(); } @@ -143,6 +163,55 @@ public virtual bool BeginExecute(Type jobType, int targetId = 0) return true; } + + /// + /// Starts processing of queue. + /// + private void ProcessQueue() + { + Tuple job = null; + + try + { + lock (Queue) + { + if (Queue.Count != 0) + { + job = Queue[0]; + } + } + + if (job != null) + { + Execute(job.Item1, job.Item2); + } + + } + catch (Exception e) + { + Logger.FatalException("An error has occured while processing queued job.", e); + } + finally + { + if (job != null) + { + Queue.Remove(job); + } + } + + //Try to find next job is last run found a job. + if (job != null) + { + ProcessQueue(); + } + else + { + Logger.Debug("Finished processing jobs in the queue."); + } + + return; + } + /// /// Executes the job /// @@ -221,7 +290,7 @@ public virtual void Initialize() /// Gets the next scheduled run time for the job /// (Estimated due to schedule timer) /// - /// DateTime of next scheduled job + /// DateTime of next scheduled job execution public virtual DateTime NextScheduledRun(Type jobType) { var job = All().Where(t => t.TypeName == jobType.ToString()).FirstOrDefault(); diff --git a/NzbDrone.Core/Providers/Jobs/MediaFileScanJob.cs b/NzbDrone.Core/Providers/Jobs/MediaFileScanJob.cs index de757c6fb..a53a10672 100644 --- a/NzbDrone.Core/Providers/Jobs/MediaFileScanJob.cs +++ b/NzbDrone.Core/Providers/Jobs/MediaFileScanJob.cs @@ -42,9 +42,9 @@ public void Start(ProgressNotification notification, int targetId) foreach (var series in seriesToScan) { - notification.CurrentMessage = "Scanning for files: " + series.Title; + notification.CurrentMessage = string.Format("Scanning disk for '{0}'", series.Title); _mediaFileProvider.Scan(series); - notification.CurrentMessage = "Media File Scan completed for " + series.Title; + notification.CurrentMessage = string.Format("Media File Scan completed for '{0}'", series.Title); } } } diff --git a/NzbDrone.Core/Providers/Jobs/NewSeriesUpdate.cs b/NzbDrone.Core/Providers/Jobs/NewSeriesUpdate.cs index f2af6d1d3..70190f52e 100644 --- a/NzbDrone.Core/Providers/Jobs/NewSeriesUpdate.cs +++ b/NzbDrone.Core/Providers/Jobs/NewSeriesUpdate.cs @@ -39,21 +39,23 @@ public void Start(ProgressNotification notification, int targetId) private void ScanSeries(ProgressNotification notification) { var syncList = _seriesProvider.GetAllSeries().Where(s => s.LastInfoSync == null).ToList(); - if (syncList.Count == 0) return; + if (syncList.Count == 0) + { + return; + } foreach (var currentSeries in syncList) { try { - notification.CurrentMessage = String.Format("Searching For: {0}", new DirectoryInfo(currentSeries.Path).Name); + notification.CurrentMessage = String.Format("Searching for '{0}'", new DirectoryInfo(currentSeries.Path).Name); var updatedSeries = _seriesProvider.UpdateSeriesInfo(currentSeries.SeriesId); - notification.CurrentMessage = String.Format("Downloading episode info For: {0}", + notification.CurrentMessage = String.Format("Downloading episode info for '{0}'", updatedSeries.Title); _episodeProvider.RefreshEpisodeInfo(updatedSeries.SeriesId); - notification.CurrentMessage = String.Format("Scanning disk for {0} files", - updatedSeries.Title); + notification.CurrentMessage = String.Format("Scanning disk for '{0}' files", updatedSeries.Title); _mediaFileProvider.Scan(_seriesProvider.GetSeries(updatedSeries.SeriesId)); } diff --git a/NzbDrone.Web/Controllers/AddSeriesController.cs b/NzbDrone.Web/Controllers/AddSeriesController.cs index 08845e628..cef67ebaa 100644 --- a/NzbDrone.Web/Controllers/AddSeriesController.cs +++ b/NzbDrone.Web/Controllers/AddSeriesController.cs @@ -36,7 +36,7 @@ public AddSeriesController(SyncProvider syncProvider, RootDirProvider rootFolder [HttpPost] public JsonResult ScanNewSeries() { - _jobProvider.BeginExecute(typeof(NewSeriesUpdate)); + _jobProvider.QueueJob(typeof(NewSeriesUpdate)); return new JsonResult(); } diff --git a/NzbDrone.Web/Controllers/SeriesController.cs b/NzbDrone.Web/Controllers/SeriesController.cs index f895df2a0..80ba05d0f 100644 --- a/NzbDrone.Web/Controllers/SeriesController.cs +++ b/NzbDrone.Web/Controllers/SeriesController.cs @@ -54,7 +54,7 @@ public ActionResult Index() public ActionResult RssSync() { - _jobProvider.BeginExecute(typeof(RssSyncJob)); + _jobProvider.QueueJob(typeof(RssSyncJob)); return RedirectToAction("Index"); } @@ -119,7 +119,7 @@ public ActionResult _DeleteAjaxSeriesEditing(int id) seriesInDb.RemoveAll(s => s.SeriesId == id); //Start removing this series - _jobProvider.BeginExecute(typeof(DeleteSeriesJob), id); + _jobProvider.QueueJob(typeof(DeleteSeriesJob), id); var series = GetSeriesModels(seriesInDb); return View(new GridModel(series)); @@ -274,7 +274,7 @@ public ActionResult SyncEpisodesOnDisk(int seriesId) public ActionResult UpdateInfo(int seriesId) { //Syncs the episodes on disk for the specified series - _jobProvider.BeginExecute(typeof(UpdateInfoJob), seriesId); + _jobProvider.QueueJob(typeof(UpdateInfoJob), seriesId); return RedirectToAction("Details", new { seriesId }); } diff --git a/NzbDrone.sln b/NzbDrone.sln index 0b93d492d..0cfcfbde7 100644 --- a/NzbDrone.sln +++ b/NzbDrone.sln @@ -68,6 +68,7 @@ Global {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Any CPU.Build.0 = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x64.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x86.ActiveCfg = Debug|Any CPU {193ADD3B-792B-4173-8E4C-5A3F8F0237F0}.Debug|x86.Build.0 = Debug|Any CPU