From 5e77d515109bc7904a84b24db58fc4b09815a6a2 Mon Sep 17 00:00:00 2001 From: Keivan Beigi Date: Mon, 4 Mar 2013 18:34:38 -0800 Subject: [PATCH] Cleaned up JobController. --- .../JobTests/JobControllerFixture.cs | 40 +++--- .../NzbDrone.Core.Test.ncrunchproject | 3 + NzbDrone.Core/Jobs/JobController.cs | 118 ++++++------------ 3 files changed, 59 insertions(+), 102 deletions(-) diff --git a/NzbDrone.Core.Test/JobTests/JobControllerFixture.cs b/NzbDrone.Core.Test/JobTests/JobControllerFixture.cs index 6068855c6..263b4343f 100644 --- a/NzbDrone.Core.Test/JobTests/JobControllerFixture.cs +++ b/NzbDrone.Core.Test/JobTests/JobControllerFixture.cs @@ -79,7 +79,7 @@ public void running_scheduled_jobs_should_updates_last_execution_time() { GivenPendingJob(new List { new JobDefinition { TypeName = _fakeJob.GetType().FullName } }); - Subject.QueueScheduled(); + Subject.EnqueueScheduled(); WaitForQueue(); _updatedJob.LastExecution.Should().BeWithin(TimeSpan.FromSeconds(10)); @@ -92,7 +92,7 @@ public void failing_scheduled_job_should_mark_job_as_failed() { GivenPendingJob(new List { new JobDefinition { TypeName = _brokenJob.GetType().FullName } }); - Subject.QueueScheduled(); + Subject.EnqueueScheduled(); WaitForQueue(); _updatedJob.LastExecution.Should().BeWithin(TimeSpan.FromSeconds(10)); @@ -102,11 +102,11 @@ public void failing_scheduled_job_should_mark_job_as_failed() } [Test] - public void can_run_async_job_again() + public void can_run_job_again() { - Subject.QueueJob(typeof(FakeJob)); + Subject.Enqueue(typeof(FakeJob)); WaitForQueue(); - Subject.QueueJob(typeof(FakeJob)); + Subject.Enqueue(typeof(FakeJob)); WaitForQueue(); Subject.Queue.Should().BeEmpty(); @@ -116,9 +116,9 @@ public void can_run_async_job_again() [Test] public void should_ignore_job_with_same_arg() { - Subject.QueueJob(typeof(SlowJob2), 1); - Subject.QueueJob(typeof(FakeJob), 1); - Subject.QueueJob(typeof(FakeJob), 1); + Subject.Enqueue(typeof(SlowJob2), 1); + Subject.Enqueue(typeof(FakeJob), 1); + Subject.Enqueue(typeof(FakeJob), 1); WaitForQueue(); @@ -131,10 +131,10 @@ public void should_ignore_job_with_same_arg() [Test] public void can_run_broken_job_again() { - Subject.QueueJob(typeof(BrokenJob)); + Subject.Enqueue(typeof(BrokenJob)); WaitForQueue(); - Subject.QueueJob(typeof(BrokenJob)); + Subject.Enqueue(typeof(BrokenJob)); WaitForQueue(); @@ -145,8 +145,8 @@ public void can_run_broken_job_again() [Test] public void schedule_hit_should_be_ignored_if_queue_is_running() { - Subject.QueueJob(typeof(SlowJob)); - Subject.QueueScheduled(); + Subject.Enqueue(typeof(SlowJob)); + Subject.EnqueueScheduled(); WaitForQueue(); _slowJob.ExecutionCount.Should().Be(1); @@ -158,9 +158,9 @@ public void schedule_hit_should_be_ignored_if_queue_is_running() public void can_queue_jobs_at_the_same_time() { - Subject.QueueJob(typeof(SlowJob)); - var thread1 = new Thread(() => Subject.QueueJob(typeof(FakeJob))); - var thread2 = new Thread(() => Subject.QueueJob(typeof(FakeJob))); + Subject.Enqueue(typeof(SlowJob)); + var thread1 = new Thread(() => Subject.Enqueue(typeof(FakeJob))); + var thread2 = new Thread(() => Subject.Enqueue(typeof(FakeJob))); thread1.Start(); thread2.Start(); @@ -179,7 +179,7 @@ public void can_queue_jobs_at_the_same_time() [Test] public void job_with_specific_target_should_not_update_status() { - Subject.QueueJob(typeof(FakeJob), 10); + Subject.Enqueue(typeof(FakeJob), 10); WaitForQueue(); @@ -194,12 +194,12 @@ public void Item_added_to_queue_while_scheduler_runs_should_be_executed() { GivenPendingJob(new List { new JobDefinition { TypeName = _slowJob.GetType().FullName } }); - var jobThread = new Thread(Subject.QueueScheduled); + var jobThread = new Thread(Subject.EnqueueScheduled); jobThread.Start(); Thread.Sleep(200); - Subject.QueueJob(typeof(DisabledJob), 12); + Subject.Enqueue(typeof(DisabledJob), 12); WaitForQueue(); @@ -210,7 +210,7 @@ public void Item_added_to_queue_while_scheduler_runs_should_be_executed() [Test] public void trygin_to_queue_unregistered_job_should_fail() { - Subject.QueueJob(typeof(UpdateInfoJob)); + Subject.Enqueue(typeof(UpdateInfoJob)); WaitForQueue(); ExceptionVerification.ExpectedErrors(1); } @@ -219,7 +219,7 @@ public void trygin_to_queue_unregistered_job_should_fail() public void scheduled_job_should_have_scheduler_as_source() { GivenPendingJob(new List { new JobDefinition { TypeName = _slowJob.GetType().FullName }, new JobDefinition { TypeName = _slowJob2.GetType().FullName } }); - Subject.QueueScheduled(); + Subject.EnqueueScheduled(); Subject.Queue.Should().OnlyContain(c => c.Source == JobQueueItem.JobSourceType.Scheduler); diff --git a/NzbDrone.Core.Test/NzbDrone.Core.Test.ncrunchproject b/NzbDrone.Core.Test/NzbDrone.Core.Test.ncrunchproject index 3d220e091..61dc909a2 100644 --- a/NzbDrone.Core.Test/NzbDrone.Core.Test.ncrunchproject +++ b/NzbDrone.Core.Test/NzbDrone.Core.Test.ncrunchproject @@ -546,6 +546,9 @@ NzbDrone\.Core\.Test\.TvTests\.QualityModelFixture\..* + + NzbDrone\.Core\.Test\.JobTests\.JobControllerFixture\..* + PostBuildEventDisabled;PreBuildEventDisabled \ No newline at end of file diff --git a/NzbDrone.Core/Jobs/JobController.cs b/NzbDrone.Core/Jobs/JobController.cs index 4b7a909f7..c85d5be63 100644 --- a/NzbDrone.Core/Jobs/JobController.cs +++ b/NzbDrone.Core/Jobs/JobController.cs @@ -4,6 +4,7 @@ using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Tasks; using NLog; using NzbDrone.Core.Model; using NzbDrone.Core.Model.Notification; @@ -15,9 +16,9 @@ public interface IJobController { bool IsProcessing { get; } IEnumerable Queue { get; } - void QueueScheduled(); - void QueueJob(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User); - bool QueueJob(string jobTypeString); + void EnqueueScheduled(); + void Enqueue(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User); + bool Enqueue(string jobTypeString); } public class JobController : IJobController @@ -27,14 +28,10 @@ public class JobController : IJobController private readonly IJobRepository _jobRepository; private readonly Logger _logger; - private Thread _jobThread; - - - - private readonly object _executionLock = new object(); private readonly BlockingCollection _queue = new BlockingCollection(); private ProgressNotification _notification; + private readonly CancellationTokenSource _cancellationTokenSource; public JobController(NotificationProvider notificationProvider, IEnumerable jobs, IJobRepository jobRepository, Logger logger) { @@ -42,12 +39,14 @@ public JobController(NotificationProvider notificationProvider, IEnumerable Queue { get @@ -56,15 +55,12 @@ public IEnumerable Queue } } - public virtual void QueueScheduled() + public void EnqueueScheduled() { - lock (_executionLock) + if (IsProcessing) { - if (_jobThread.IsAlive) - { - _logger.Trace("Queue is already running. Ignoring scheduler's request."); - return; - } + _logger.Trace("Queue is already running. Ignoring scheduler's request."); + return; } var pendingJobs = _jobRepository.GetPendingJobs() @@ -72,11 +68,11 @@ public virtual void QueueScheduled() .GetType()).ToList(); - pendingJobs.ForEach(jobType => QueueJob(jobType, source: JobQueueItem.JobSourceType.Scheduler)); + pendingJobs.ForEach(jobType => Enqueue(jobType, source: JobQueueItem.JobSourceType.Scheduler)); _logger.Trace("{0} Scheduled tasks have been added to the queue", pendingJobs.Count); } - public virtual void QueueJob(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User) + public void Enqueue(Type jobType, dynamic options = null, JobQueueItem.JobSourceType source = JobQueueItem.JobSourceType.User) { IsProcessing = true; @@ -89,84 +85,58 @@ public virtual void QueueJob(Type jobType, dynamic options = null, JobQueueItem. _logger.Debug("Attempting to queue {0}", queueItem); - lock (_executionLock) + lock (_queue) { - lock (_queue) + if (!_queue.Contains(queueItem)) { - if (!_queue.Contains(queueItem)) - { - _queue.Add(queueItem); - _logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, _queue.Count); - } - else - { - _logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, _queue.Count); - } + _queue.Add(queueItem); + _logger.Trace("Job {0} added to the queue. current items in queue: {1}", queueItem, _queue.Count); } - - if (_jobThread.IsAlive) + else { - _logger.Trace("Queue is already running. No need to start it up."); - return; + _logger.Info("{0} already exists in the queue. Skipping. current items in queue: {1}", queueItem, _queue.Count); } - - ResetThread(); - _jobThread.Start(); } } - public virtual bool QueueJob(string jobTypeString) + + public bool Enqueue(string jobTypeString) { var type = Type.GetType(jobTypeString); if (type == null) return false; - QueueJob(type); + Enqueue(type); return true; } private void ProcessQueue() { - try + while (true) { - while (true) + try { IsProcessing = false; var item = _queue.Take(); - IsProcessing = true; - - try - { - Execute(item); - } - catch (ThreadAbortException) - { - throw; - } - catch (Exception e) - { - _logger.FatalException("An error has occurred while executing job.", e); - } + Execute(item); + } + catch (ThreadAbortException e) + { + _logger.Warn(e.Message); + } + catch (Exception e) + { + _logger.ErrorException("Error has occurred in queue processor thread", e); } - } - catch (ThreadAbortException e) - { - _logger.Warn(e.Message); - } - catch (Exception e) - { - _logger.ErrorException("Error has occurred in queue processor thread", e); - } - finally - { - _logger.Trace("Finished processing jobs in the queue."); } } private void Execute(JobQueueItem queueItem) { + IsProcessing = true; + var jobImplementation = _jobs.SingleOrDefault(t => t.GetType() == queueItem.JobType); if (jobImplementation == null) { @@ -194,10 +164,6 @@ private void Execute(JobQueueItem queueItem) _logger.Debug("Job {0} successfully completed in {1:0}.{2} seconds.", queueItem, sw.Elapsed.TotalSeconds, sw.Elapsed.Milliseconds / 100, sw.Elapsed.Seconds); } - catch (ThreadAbortException) - { - throw; - } catch (Exception e) { _logger.ErrorException("An error has occurred while executing job [" + jobImplementation.Name + "].", e); @@ -215,17 +181,5 @@ private void Execute(JobQueueItem queueItem) _jobRepository.Update(jobDefinition); } } - - - private void ResetThread() - { - if (_jobThread != null) - { - _jobThread.Abort(); - } - - _logger.Trace("resetting queue processor thread"); - _jobThread = new Thread(ProcessQueue) { Name = "JobQueueThread" }; - } } } \ No newline at end of file