mirror of
https://github.com/Sonarr/Sonarr.git
synced 2024-12-16 11:37:58 +02:00
Improved Command Queue
New: Limit concurrent tasks that require disk access New: Prevent other tasks from running during application update
This commit is contained in:
parent
07be9cf47a
commit
36b9c51163
@ -1,4 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using Sonarr.Http.Extensions;
|
using Sonarr.Http.Extensions;
|
||||||
@ -10,7 +10,6 @@
|
|||||||
using NzbDrone.Core.ProgressMessaging;
|
using NzbDrone.Core.ProgressMessaging;
|
||||||
using NzbDrone.SignalR;
|
using NzbDrone.SignalR;
|
||||||
using Sonarr.Http;
|
using Sonarr.Http;
|
||||||
using Sonarr.Http.Mapping;
|
|
||||||
using Sonarr.Http.Validation;
|
using Sonarr.Http.Validation;
|
||||||
|
|
||||||
|
|
||||||
|
@ -9,6 +9,8 @@ namespace NzbDrone.Common.Extensions
|
|||||||
{
|
{
|
||||||
public static class StringExtensions
|
public static class StringExtensions
|
||||||
{
|
{
|
||||||
|
private static readonly Regex CamelCaseRegex = new Regex("(?<!^)[A-Z]", RegexOptions.Compiled);
|
||||||
|
|
||||||
public static string NullSafe(this string target)
|
public static string NullSafe(this string target)
|
||||||
{
|
{
|
||||||
return ((object)target).NullSafe().ToString();
|
return ((object)target).NullSafe().ToString();
|
||||||
@ -133,5 +135,10 @@ public static string FromOctalString(this string octalValue)
|
|||||||
|
|
||||||
return Encoding.ASCII.GetString(new [] { byteResult });
|
return Encoding.ASCII.GetString(new [] { byteResult });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static string SplitCamelCase(this string input)
|
||||||
|
{
|
||||||
|
return CamelCaseRegex.Replace(input, match => " " + match.Value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
using System.Collections.Concurrent;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@ -15,7 +15,7 @@ namespace NzbDrone.Core.Test.Messaging.Commands
|
|||||||
[TestFixture]
|
[TestFixture]
|
||||||
public class CommandExecutorFixture : TestBase<CommandExecutor>
|
public class CommandExecutorFixture : TestBase<CommandExecutor>
|
||||||
{
|
{
|
||||||
private BlockingCollection<CommandModel> _commandQueue;
|
private CommandQueue _commandQueue;
|
||||||
private Mock<IExecute<CommandA>> _executorA;
|
private Mock<IExecute<CommandA>> _executorA;
|
||||||
private Mock<IExecute<CommandB>> _executorB;
|
private Mock<IExecute<CommandB>> _executorB;
|
||||||
private bool _commandExecuted = false;
|
private bool _commandExecuted = false;
|
||||||
@ -46,7 +46,7 @@ public void TearDown()
|
|||||||
|
|
||||||
private void GivenCommandQueue()
|
private void GivenCommandQueue()
|
||||||
{
|
{
|
||||||
_commandQueue = new BlockingCollection<CommandModel>(new CommandQueue());
|
_commandQueue = new CommandQueue();
|
||||||
|
|
||||||
Mocker.GetMock<IManageCommandQueue>()
|
Mocker.GetMock<IManageCommandQueue>()
|
||||||
.Setup(s => s.Queue(It.IsAny<CancellationToken>()))
|
.Setup(s => s.Queue(It.IsAny<CancellationToken>()))
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
using System.Collections.Generic;
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using FluentAssertions;
|
using FluentAssertions;
|
||||||
using Moq;
|
using Moq;
|
||||||
@ -42,6 +43,10 @@ public void should_not_remove_commands_for_five_minutes_after_they_end()
|
|||||||
{
|
{
|
||||||
var command = Subject.Push<CheckForFinishedDownloadCommand>(new CheckForFinishedDownloadCommand());
|
var command = Subject.Push<CheckForFinishedDownloadCommand>(new CheckForFinishedDownloadCommand());
|
||||||
|
|
||||||
|
// Start the command to mimic CommandQueue's behaviour
|
||||||
|
command.StartedAt = DateTime.Now;
|
||||||
|
command.Status = CommandStatus.Started;
|
||||||
|
|
||||||
Subject.Start(command);
|
Subject.Start(command);
|
||||||
Subject.Complete(command, "All done");
|
Subject.Complete(command, "All done");
|
||||||
Subject.CleanCommands();
|
Subject.CleanCommands();
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
using NzbDrone.Core.Messaging.Commands;
|
using NzbDrone.Core.Messaging.Commands;
|
||||||
|
|
||||||
namespace NzbDrone.Core.Download
|
namespace NzbDrone.Core.Download
|
||||||
{
|
{
|
||||||
public class CheckForFinishedDownloadCommand : Command
|
public class CheckForFinishedDownloadCommand : Command
|
||||||
{
|
{
|
||||||
|
public override bool RequiresDiskAccess => true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ public class RenameFilesCommand : Command
|
|||||||
public List<int> Files { get; set; }
|
public List<int> Files { get; set; }
|
||||||
|
|
||||||
public override bool SendUpdatesToClient => true;
|
public override bool SendUpdatesToClient => true;
|
||||||
|
public override bool RequiresDiskAccess => true;
|
||||||
|
|
||||||
public RenameFilesCommand()
|
public RenameFilesCommand()
|
||||||
{
|
{
|
||||||
|
@ -8,6 +8,7 @@ public class RenameSeriesCommand : Command
|
|||||||
public List<int> SeriesIds { get; set; }
|
public List<int> SeriesIds { get; set; }
|
||||||
|
|
||||||
public override bool SendUpdatesToClient => true;
|
public override bool SendUpdatesToClient => true;
|
||||||
|
public override bool RequiresDiskAccess => true;
|
||||||
|
|
||||||
public RenameSeriesCommand()
|
public RenameSeriesCommand()
|
||||||
{
|
{
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using NzbDrone.Core.Messaging.Commands;
|
using NzbDrone.Core.Messaging.Commands;
|
||||||
|
|
||||||
namespace NzbDrone.Core.MediaFiles.EpisodeImport.Manual
|
namespace NzbDrone.Core.MediaFiles.EpisodeImport.Manual
|
||||||
@ -8,6 +8,7 @@ public class ManualImportCommand : Command
|
|||||||
public List<ManualImportFile> Files { get; set; }
|
public List<ManualImportFile> Files { get; set; }
|
||||||
|
|
||||||
public override bool SendUpdatesToClient => true;
|
public override bool SendUpdatesToClient => true;
|
||||||
|
public override bool RequiresDiskAccess => true;
|
||||||
|
|
||||||
public ImportMode ImportMode { get; set; }
|
public ImportMode ImportMode { get; set; }
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,8 @@ public virtual bool SendUpdatesToClient
|
|||||||
|
|
||||||
public virtual bool UpdateScheduledTask => true;
|
public virtual bool UpdateScheduledTask => true;
|
||||||
public virtual string CompletionMessage => "Completed";
|
public virtual string CompletionMessage => "Completed";
|
||||||
|
public virtual bool RequiresDiskAccess => false;
|
||||||
|
public virtual bool IsExclusive => false;
|
||||||
|
|
||||||
public string Name { get; private set; }
|
public string Name { get; private set; }
|
||||||
public DateTime? LastExecutionTime { get; set; }
|
public DateTime? LastExecutionTime { get; set; }
|
||||||
|
@ -1,27 +1,36 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections;
|
using System.Collections;
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
|
||||||
namespace NzbDrone.Core.Messaging.Commands
|
namespace NzbDrone.Core.Messaging.Commands
|
||||||
{
|
{
|
||||||
public class CommandQueue : IProducerConsumerCollection<CommandModel>
|
public class CommandQueue : IEnumerable
|
||||||
{
|
{
|
||||||
private object Mutex = new object();
|
private readonly object _mutex = new object();
|
||||||
|
private readonly List<CommandModel> _items;
|
||||||
private List<CommandModel> _items;
|
|
||||||
|
|
||||||
public CommandQueue()
|
public CommandQueue()
|
||||||
{
|
{
|
||||||
_items = new List<CommandModel>();
|
_items = new List<CommandModel>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int Count => _items.Count;
|
||||||
|
|
||||||
|
public void Add(CommandModel item)
|
||||||
|
{
|
||||||
|
lock (_mutex)
|
||||||
|
{
|
||||||
|
_items.Add(item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public IEnumerator<CommandModel> GetEnumerator()
|
public IEnumerator<CommandModel> GetEnumerator()
|
||||||
{
|
{
|
||||||
List<CommandModel> copy = null;
|
List<CommandModel> copy = null;
|
||||||
|
|
||||||
lock (Mutex)
|
lock (_mutex)
|
||||||
{
|
{
|
||||||
copy = new List<CommandModel>(_items);
|
copy = new List<CommandModel>(_items);
|
||||||
}
|
}
|
||||||
@ -34,77 +43,140 @@ IEnumerator IEnumerable.GetEnumerator()
|
|||||||
return GetEnumerator();
|
return GetEnumerator();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void CopyTo(Array array, int index)
|
public List<CommandModel> All()
|
||||||
{
|
{
|
||||||
lock (Mutex)
|
List<CommandModel> rval = null;
|
||||||
|
|
||||||
|
lock (_mutex)
|
||||||
{
|
{
|
||||||
((ICollection)_items).CopyTo(array, index);
|
rval = _items;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CommandModel Find(int id)
|
||||||
|
{
|
||||||
|
return All().FirstOrDefault(q => q.Id == id);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RemoveMany(IEnumerable<CommandModel> commands)
|
||||||
|
{
|
||||||
|
lock (_mutex)
|
||||||
|
{
|
||||||
|
foreach (var command in commands)
|
||||||
|
{
|
||||||
|
_items.Remove(command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public bool RemoveIfQueued(int id)
|
||||||
|
{
|
||||||
|
var rval = false;
|
||||||
|
|
||||||
|
lock (_mutex)
|
||||||
|
{
|
||||||
|
var command = _items.FirstOrDefault(q => q.Id == id);
|
||||||
|
|
||||||
|
if (command?.Status == CommandStatus.Queued)
|
||||||
|
{
|
||||||
|
_items.Remove(command);
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<CommandModel> QueuedOrStarted()
|
||||||
|
{
|
||||||
|
return All().Where(q => q.Status == CommandStatus.Queued || q.Status == CommandStatus.Started)
|
||||||
|
.ToList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public IEnumerable<CommandModel> GetConsumingEnumerable()
|
||||||
|
{
|
||||||
|
return GetConsumingEnumerable(CancellationToken.None);
|
||||||
|
}
|
||||||
|
|
||||||
|
public IEnumerable<CommandModel> GetConsumingEnumerable(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
if (TryGet(out var command))
|
||||||
|
{
|
||||||
|
yield return command;
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.Sleep(10);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int Count => _items.Count;
|
public bool TryGet(out CommandModel item)
|
||||||
|
|
||||||
public object SyncRoot => Mutex;
|
|
||||||
|
|
||||||
public bool IsSynchronized => true;
|
|
||||||
|
|
||||||
public void CopyTo(CommandModel[] array, int index)
|
|
||||||
{
|
{
|
||||||
lock (Mutex)
|
var rval = true;
|
||||||
{
|
item = default(CommandModel);
|
||||||
_items.CopyTo(array, index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool TryAdd(CommandModel item)
|
lock (_mutex)
|
||||||
{
|
|
||||||
Add(item);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool TryTake(out CommandModel item)
|
|
||||||
{
|
|
||||||
bool rval = true;
|
|
||||||
lock (Mutex)
|
|
||||||
{
|
{
|
||||||
if (_items.Count == 0)
|
if (_items.Count == 0)
|
||||||
{
|
{
|
||||||
item = default(CommandModel);
|
|
||||||
rval = false;
|
rval = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
item = _items.Where(c => c.Status == CommandStatus.Queued)
|
var startedCommands = _items.Where(c => c.Status == CommandStatus.Started)
|
||||||
.OrderByDescending(c => c.Priority)
|
.ToList();
|
||||||
.ThenBy(c => c.QueuedAt)
|
|
||||||
.First();
|
|
||||||
|
|
||||||
_items.Remove(item);
|
var localItem = _items.Where(c =>
|
||||||
|
{
|
||||||
|
// If an executing command requires disk access don't return a command that
|
||||||
|
// requires disk access. A lower priority or later queued task could be returned
|
||||||
|
// instead, but that will allow other tasks to execute whiule waiting for disk access.
|
||||||
|
if (startedCommands.Any(x => x.Body.RequiresDiskAccess))
|
||||||
|
{
|
||||||
|
return c.Status == CommandStatus.Queued &&
|
||||||
|
!c.Body.RequiresDiskAccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Status == CommandStatus.Queued;
|
||||||
|
})
|
||||||
|
.OrderByDescending(c => c.Priority)
|
||||||
|
.ThenBy(c => c.QueuedAt)
|
||||||
|
.FirstOrDefault();
|
||||||
|
|
||||||
|
// Nothing queued that meets the requirements
|
||||||
|
if (localItem == null)
|
||||||
|
{
|
||||||
|
rval = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If any executing command is exclusive don't want return another command until it completes.
|
||||||
|
else if (startedCommands.Any(c => c.Body.IsExclusive))
|
||||||
|
{
|
||||||
|
rval = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the next command to execute is exclusive wait for executing commands to complete.
|
||||||
|
// This will prevent other tasks from starting so the exclusive task executes in the order it should.
|
||||||
|
else if (localItem.Body.IsExclusive && startedCommands.Any())
|
||||||
|
{
|
||||||
|
rval = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// A command ready to execute
|
||||||
|
else
|
||||||
|
{
|
||||||
|
localItem.StartedAt = DateTime.UtcNow;
|
||||||
|
localItem.Status = CommandStatus.Started;
|
||||||
|
|
||||||
|
item = localItem;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
public CommandModel[] ToArray()
|
|
||||||
{
|
|
||||||
CommandModel[] rval = null;
|
|
||||||
|
|
||||||
lock (Mutex)
|
|
||||||
{
|
|
||||||
rval = _items.ToArray();
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void Add(CommandModel item)
|
|
||||||
{
|
|
||||||
lock (Mutex)
|
|
||||||
{
|
|
||||||
_items.Add(item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,15 @@
|
|||||||
using System;
|
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
|
||||||
using System.Threading;
|
|
||||||
using NLog;
|
using NLog;
|
||||||
using NzbDrone.Common;
|
using NzbDrone.Common;
|
||||||
using NzbDrone.Common.Cache;
|
|
||||||
using NzbDrone.Common.EnsureThat;
|
using NzbDrone.Common.EnsureThat;
|
||||||
using NzbDrone.Common.Serializer;
|
using NzbDrone.Common.Serializer;
|
||||||
using NzbDrone.Core.Lifecycle;
|
using NzbDrone.Core.Lifecycle;
|
||||||
using NzbDrone.Core.Messaging.Events;
|
using NzbDrone.Core.Messaging.Events;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Net;
|
||||||
|
using System.Threading;
|
||||||
|
using NzbDrone.Core.Exceptions;
|
||||||
|
|
||||||
namespace NzbDrone.Core.Messaging.Commands
|
namespace NzbDrone.Core.Messaging.Commands
|
||||||
{
|
{
|
||||||
@ -19,6 +19,7 @@ public interface IManageCommandQueue
|
|||||||
CommandModel Push<TCommand>(TCommand command, CommandPriority priority = CommandPriority.Normal, CommandTrigger trigger = CommandTrigger.Unspecified) where TCommand : Command;
|
CommandModel Push<TCommand>(TCommand command, CommandPriority priority = CommandPriority.Normal, CommandTrigger trigger = CommandTrigger.Unspecified) where TCommand : Command;
|
||||||
CommandModel Push(string commandName, DateTime? lastExecutionTime, CommandPriority priority = CommandPriority.Normal, CommandTrigger trigger = CommandTrigger.Unspecified);
|
CommandModel Push(string commandName, DateTime? lastExecutionTime, CommandPriority priority = CommandPriority.Normal, CommandTrigger trigger = CommandTrigger.Unspecified);
|
||||||
IEnumerable<CommandModel> Queue(CancellationToken cancellationToken);
|
IEnumerable<CommandModel> Queue(CancellationToken cancellationToken);
|
||||||
|
List<CommandModel> All();
|
||||||
CommandModel Get(int id);
|
CommandModel Get(int id);
|
||||||
List<CommandModel> GetStarted();
|
List<CommandModel> GetStarted();
|
||||||
void SetMessage(CommandModel command, string message);
|
void SetMessage(CommandModel command, string message);
|
||||||
@ -26,6 +27,7 @@ public interface IManageCommandQueue
|
|||||||
void Complete(CommandModel command, string message);
|
void Complete(CommandModel command, string message);
|
||||||
void Fail(CommandModel command, string message, Exception e);
|
void Fail(CommandModel command, string message, Exception e);
|
||||||
void Requeue();
|
void Requeue();
|
||||||
|
void Cancel(int id);
|
||||||
void CleanCommands();
|
void CleanCommands();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,20 +37,17 @@ public class CommandQueueManager : IManageCommandQueue, IHandle<ApplicationStart
|
|||||||
private readonly IServiceFactory _serviceFactory;
|
private readonly IServiceFactory _serviceFactory;
|
||||||
private readonly Logger _logger;
|
private readonly Logger _logger;
|
||||||
|
|
||||||
private readonly ICached<CommandModel> _commandCache;
|
private readonly CommandQueue _commandQueue;
|
||||||
private readonly BlockingCollection<CommandModel> _commandQueue;
|
|
||||||
|
|
||||||
public CommandQueueManager(ICommandRepository repo,
|
public CommandQueueManager(ICommandRepository repo,
|
||||||
IServiceFactory serviceFactory,
|
IServiceFactory serviceFactory,
|
||||||
ICacheManager cacheManager,
|
|
||||||
Logger logger)
|
Logger logger)
|
||||||
{
|
{
|
||||||
_repo = repo;
|
_repo = repo;
|
||||||
_serviceFactory = serviceFactory;
|
_serviceFactory = serviceFactory;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
|
|
||||||
_commandCache = cacheManager.GetCache<CommandModel>(GetType());
|
_commandQueue = new CommandQueue();
|
||||||
_commandQueue = new BlockingCollection<CommandModel>(new CommandQueue());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<CommandModel> PushMany<TCommand>(List<TCommand> commands) where TCommand : Command
|
public List<CommandModel> PushMany<TCommand>(List<TCommand> commands) where TCommand : Command
|
||||||
@ -56,8 +55,7 @@ public List<CommandModel> PushMany<TCommand>(List<TCommand> commands) where TCom
|
|||||||
_logger.Trace("Publishing {0} commands", commands.Count);
|
_logger.Trace("Publishing {0} commands", commands.Count);
|
||||||
|
|
||||||
var commandModels = new List<CommandModel>();
|
var commandModels = new List<CommandModel>();
|
||||||
var existingCommands = _commandCache.Values.Where(q => q.Status == CommandStatus.Queued ||
|
var existingCommands = _commandQueue.QueuedOrStarted();
|
||||||
q.Status == CommandStatus.Started).ToList();
|
|
||||||
|
|
||||||
foreach (var command in commands)
|
foreach (var command in commands)
|
||||||
{
|
{
|
||||||
@ -85,7 +83,6 @@ public List<CommandModel> PushMany<TCommand>(List<TCommand> commands) where TCom
|
|||||||
|
|
||||||
foreach (var commandModel in commandModels)
|
foreach (var commandModel in commandModels)
|
||||||
{
|
{
|
||||||
_commandCache.Set(commandModel.Id.ToString(), commandModel);
|
|
||||||
_commandQueue.Add(commandModel);
|
_commandQueue.Add(commandModel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,7 +119,6 @@ public CommandModel Push<TCommand>(TCommand command, CommandPriority priority =
|
|||||||
_logger.Trace("Inserting new command: {0}", commandModel.Name);
|
_logger.Trace("Inserting new command: {0}", commandModel.Name);
|
||||||
|
|
||||||
_repo.Insert(commandModel);
|
_repo.Insert(commandModel);
|
||||||
_commandCache.Set(commandModel.Id.ToString(), commandModel);
|
|
||||||
_commandQueue.Add(commandModel);
|
_commandQueue.Add(commandModel);
|
||||||
|
|
||||||
return commandModel;
|
return commandModel;
|
||||||
@ -142,30 +138,39 @@ public IEnumerable<CommandModel> Queue(CancellationToken cancellationToken)
|
|||||||
return _commandQueue.GetConsumingEnumerable(cancellationToken);
|
return _commandQueue.GetConsumingEnumerable(cancellationToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<CommandModel> All()
|
||||||
|
{
|
||||||
|
_logger.Trace("Getting all commands");
|
||||||
|
return _commandQueue.All();
|
||||||
|
}
|
||||||
|
|
||||||
public CommandModel Get(int id)
|
public CommandModel Get(int id)
|
||||||
{
|
{
|
||||||
return _commandCache.Get(id.ToString(), () => FindCommand(_repo.Get(id)));
|
var command = _commandQueue.Find(id);
|
||||||
|
|
||||||
|
if (command == null)
|
||||||
|
{
|
||||||
|
command = _repo.Get(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return command;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<CommandModel> GetStarted()
|
public List<CommandModel> GetStarted()
|
||||||
{
|
{
|
||||||
_logger.Trace("Getting started commands");
|
_logger.Trace("Getting started commands");
|
||||||
return _commandCache.Values.Where(c => c.Status == CommandStatus.Started).ToList();
|
return _commandQueue.All().Where(c => c.Status == CommandStatus.Started).ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void SetMessage(CommandModel command, string message)
|
public void SetMessage(CommandModel command, string message)
|
||||||
{
|
{
|
||||||
command.Message = message;
|
command.Message = message;
|
||||||
_commandCache.Set(command.Id.ToString(), command);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Start(CommandModel command)
|
public void Start(CommandModel command)
|
||||||
{
|
{
|
||||||
command.StartedAt = DateTime.UtcNow;
|
// Marks the command as started in the DB, the queue takes care of marking it as started on it's own
|
||||||
command.Status = CommandStatus.Started;
|
|
||||||
|
|
||||||
_logger.Trace("Marking command as started: {0}", command.Name);
|
_logger.Trace("Marking command as started: {0}", command.Name);
|
||||||
_commandCache.Set(command.Id.ToString(), command);
|
|
||||||
_repo.Start(command);
|
_repo.Start(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,16 +194,23 @@ public void Requeue()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void Cancel(int id)
|
||||||
|
{
|
||||||
|
if (!_commandQueue.RemoveIfQueued(id))
|
||||||
|
{
|
||||||
|
throw new NzbDroneClientException(HttpStatusCode.Conflict, "Unable to cancel task");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void CleanCommands()
|
public void CleanCommands()
|
||||||
{
|
{
|
||||||
_logger.Trace("Cleaning up old commands");
|
_logger.Trace("Cleaning up old commands");
|
||||||
|
|
||||||
var old = _commandCache.Values.Where(c => c.EndedAt < DateTime.UtcNow.AddMinutes(-5));
|
var commands = _commandQueue.All()
|
||||||
|
.Where(c => c.EndedAt < DateTime.UtcNow.AddMinutes(-5))
|
||||||
|
.ToList();
|
||||||
|
|
||||||
foreach (var command in old)
|
_commandQueue.RemoveMany(commands);
|
||||||
{
|
|
||||||
_commandCache.Remove(command.Id.ToString());
|
|
||||||
}
|
|
||||||
|
|
||||||
_repo.Trim();
|
_repo.Trim();
|
||||||
}
|
}
|
||||||
@ -213,18 +225,6 @@ private dynamic GetCommand(string commandName)
|
|||||||
return Json.Deserialize("{}", commandType);
|
return Json.Deserialize("{}", commandType);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CommandModel FindCommand(CommandModel command)
|
|
||||||
{
|
|
||||||
var cachedCommand = _commandCache.Find(command.Id.ToString());
|
|
||||||
|
|
||||||
if (cachedCommand != null)
|
|
||||||
{
|
|
||||||
command.Message = cachedCommand.Message;
|
|
||||||
}
|
|
||||||
|
|
||||||
return command;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void Update(CommandModel command, CommandStatus status, string message)
|
private void Update(CommandModel command, CommandStatus status, string message)
|
||||||
{
|
{
|
||||||
SetMessage(command, message);
|
SetMessage(command, message);
|
||||||
@ -234,15 +234,14 @@ private void Update(CommandModel command, CommandStatus status, string message)
|
|||||||
command.Status = status;
|
command.Status = status;
|
||||||
|
|
||||||
_logger.Trace("Updating command status");
|
_logger.Trace("Updating command status");
|
||||||
_commandCache.Set(command.Id.ToString(), command);
|
|
||||||
_repo.End(command);
|
_repo.End(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CommandModel> QueuedOrStarted(string name)
|
private List<CommandModel> QueuedOrStarted(string name)
|
||||||
{
|
{
|
||||||
return _commandCache.Values.Where(q => q.Name == name &&
|
return _commandQueue.QueuedOrStarted()
|
||||||
(q.Status == CommandStatus.Queued ||
|
.Where(q => q.Name == name)
|
||||||
q.Status == CommandStatus.Started)).ToList();
|
.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Handle(ApplicationStartedEvent message)
|
public void Handle(ApplicationStartedEvent message)
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Data.SQLite;
|
using System.Data.SQLite;
|
||||||
using NzbDrone.Core.Datastore;
|
using NzbDrone.Core.Datastore;
|
||||||
|
@ -10,6 +10,7 @@ public class BulkMoveSeriesCommand : Command
|
|||||||
public string DestinationRootFolder { get; set; }
|
public string DestinationRootFolder { get; set; }
|
||||||
|
|
||||||
public override bool SendUpdatesToClient => true;
|
public override bool SendUpdatesToClient => true;
|
||||||
|
public override bool RequiresDiskAccess => true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public class BulkMoveSeries : IEquatable<BulkMoveSeries>
|
public class BulkMoveSeries : IEquatable<BulkMoveSeries>
|
||||||
|
@ -9,5 +9,6 @@ public class MoveSeriesCommand : Command
|
|||||||
public string DestinationPath { get; set; }
|
public string DestinationPath { get; set; }
|
||||||
|
|
||||||
public override bool SendUpdatesToClient => true;
|
public override bool SendUpdatesToClient => true;
|
||||||
|
public override bool RequiresDiskAccess => true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
using NzbDrone.Core.Messaging.Commands;
|
using NzbDrone.Core.Messaging.Commands;
|
||||||
|
|
||||||
namespace NzbDrone.Core.Update.Commands
|
namespace NzbDrone.Core.Update.Commands
|
||||||
{
|
{
|
||||||
public class ApplicationUpdateCommand : Command
|
public class ApplicationUpdateCommand : Command
|
||||||
{
|
{
|
||||||
public override bool SendUpdatesToClient => true;
|
public override bool SendUpdatesToClient => true;
|
||||||
|
public override bool IsExclusive => true;
|
||||||
|
|
||||||
public override string CompletionMessage => null;
|
public override string CompletionMessage => null;
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using NzbDrone.Common;
|
using NzbDrone.Common;
|
||||||
|
using NzbDrone.Common.TPL;
|
||||||
using NzbDrone.Core.Datastore.Events;
|
using NzbDrone.Core.Datastore.Events;
|
||||||
using NzbDrone.Core.Messaging.Commands;
|
using NzbDrone.Core.Messaging.Commands;
|
||||||
using NzbDrone.Core.Messaging.Events;
|
using NzbDrone.Core.Messaging.Events;
|
||||||
@ -17,6 +18,8 @@ public class CommandModule : SonarrRestModuleWithSignalR<CommandResource, Comman
|
|||||||
{
|
{
|
||||||
private readonly IManageCommandQueue _commandQueueManager;
|
private readonly IManageCommandQueue _commandQueueManager;
|
||||||
private readonly IServiceFactory _serviceFactory;
|
private readonly IServiceFactory _serviceFactory;
|
||||||
|
private readonly Debouncer _debouncer;
|
||||||
|
private readonly Dictionary<int, CommandResource> _pendingUpdates;
|
||||||
|
|
||||||
public CommandModule(IManageCommandQueue commandQueueManager,
|
public CommandModule(IManageCommandQueue commandQueueManager,
|
||||||
IBroadcastSignalRMessage signalRBroadcaster,
|
IBroadcastSignalRMessage signalRBroadcaster,
|
||||||
@ -26,9 +29,13 @@ public CommandModule(IManageCommandQueue commandQueueManager,
|
|||||||
_commandQueueManager = commandQueueManager;
|
_commandQueueManager = commandQueueManager;
|
||||||
_serviceFactory = serviceFactory;
|
_serviceFactory = serviceFactory;
|
||||||
|
|
||||||
|
_debouncer = new Debouncer(SendUpdates, TimeSpan.FromSeconds(0.1));
|
||||||
|
_pendingUpdates = new Dictionary<int, CommandResource>();
|
||||||
|
|
||||||
GetResourceById = GetCommand;
|
GetResourceById = GetCommand;
|
||||||
CreateResource = StartCommand;
|
CreateResource = StartCommand;
|
||||||
GetResourceAll = GetStartedCommands;
|
GetResourceAll = GetStartedCommands;
|
||||||
|
DeleteResource = CancelCommand;
|
||||||
|
|
||||||
PostValidator.RuleFor(c => c.Name).NotBlank();
|
PostValidator.RuleFor(c => c.Name).NotBlank();
|
||||||
}
|
}
|
||||||
@ -56,14 +63,44 @@ private int StartCommand(CommandResource commandResource)
|
|||||||
|
|
||||||
private List<CommandResource> GetStartedCommands()
|
private List<CommandResource> GetStartedCommands()
|
||||||
{
|
{
|
||||||
return _commandQueueManager.GetStarted().ToResource();
|
return _commandQueueManager.All().ToResource();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void CancelCommand(int id)
|
||||||
|
{
|
||||||
|
_commandQueueManager.Cancel(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void Handle(CommandUpdatedEvent message)
|
public void Handle(CommandUpdatedEvent message)
|
||||||
{
|
{
|
||||||
if (message.Command.Body.SendUpdatesToClient)
|
if (message.Command.Body.SendUpdatesToClient)
|
||||||
{
|
{
|
||||||
BroadcastResourceChange(ModelAction.Updated, message.Command.ToResource());
|
lock (_pendingUpdates)
|
||||||
|
{
|
||||||
|
_pendingUpdates[message.Command.Id] = message.Command.ToResource();
|
||||||
|
}
|
||||||
|
|
||||||
|
_debouncer.Execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void SendUpdates()
|
||||||
|
{
|
||||||
|
lock (_pendingUpdates)
|
||||||
|
{
|
||||||
|
var pendingUpdates = _pendingUpdates.Values.ToArray();
|
||||||
|
_pendingUpdates.Clear();
|
||||||
|
|
||||||
|
foreach (var pendingUpdate in pendingUpdates)
|
||||||
|
{
|
||||||
|
BroadcastResourceChange(ModelAction.Updated, pendingUpdate);
|
||||||
|
|
||||||
|
if (pendingUpdate.Name == typeof(MessagingCleanupCommand).Name.Replace("Command", "") &&
|
||||||
|
pendingUpdate.Status == CommandStatus.Completed)
|
||||||
|
{
|
||||||
|
BroadcastResourceChange(ModelAction.Sync);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
|
using NzbDrone.Common.Extensions;
|
||||||
using NzbDrone.Core.Messaging.Commands;
|
using NzbDrone.Core.Messaging.Commands;
|
||||||
using Sonarr.Http.REST;
|
using Sonarr.Http.REST;
|
||||||
|
|
||||||
@ -10,6 +11,7 @@ namespace Sonarr.Api.V3.Commands
|
|||||||
public class CommandResource : RestResource
|
public class CommandResource : RestResource
|
||||||
{
|
{
|
||||||
public string Name { get; set; }
|
public string Name { get; set; }
|
||||||
|
public string CommandName { get; set; }
|
||||||
public string Message { get; set; }
|
public string Message { get; set; }
|
||||||
public Command Body { get; set; }
|
public Command Body { get; set; }
|
||||||
public CommandPriority Priority { get; set; }
|
public CommandPriority Priority { get; set; }
|
||||||
@ -24,37 +26,6 @@ public class CommandResource : RestResource
|
|||||||
[JsonIgnore]
|
[JsonIgnore]
|
||||||
public string CompletionMessage { get; set; }
|
public string CompletionMessage { get; set; }
|
||||||
|
|
||||||
//Legacy
|
|
||||||
public CommandStatus State
|
|
||||||
{
|
|
||||||
get
|
|
||||||
{
|
|
||||||
return Status;
|
|
||||||
}
|
|
||||||
|
|
||||||
set { }
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool Manual
|
|
||||||
{
|
|
||||||
get
|
|
||||||
{
|
|
||||||
return Trigger == CommandTrigger.Manual;
|
|
||||||
}
|
|
||||||
|
|
||||||
set { }
|
|
||||||
}
|
|
||||||
|
|
||||||
public DateTime StartedOn
|
|
||||||
{
|
|
||||||
get
|
|
||||||
{
|
|
||||||
return Queued;
|
|
||||||
}
|
|
||||||
|
|
||||||
set { }
|
|
||||||
}
|
|
||||||
|
|
||||||
public DateTime? StateChangeTime
|
public DateTime? StateChangeTime
|
||||||
{
|
{
|
||||||
get
|
get
|
||||||
@ -106,6 +77,7 @@ public static CommandResource ToResource(this CommandModel model)
|
|||||||
Id = model.Id,
|
Id = model.Id,
|
||||||
|
|
||||||
Name = model.Name,
|
Name = model.Name,
|
||||||
|
CommandName = model.Name.SplitCamelCase(),
|
||||||
Message = model.Message,
|
Message = model.Message,
|
||||||
Body = model.Body,
|
Body = model.Body,
|
||||||
Priority = model.Priority,
|
Priority = model.Priority,
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text.RegularExpressions;
|
using System.Text.RegularExpressions;
|
||||||
|
using NzbDrone.Common.Extensions;
|
||||||
using NzbDrone.Core.Datastore.Events;
|
using NzbDrone.Core.Datastore.Events;
|
||||||
using NzbDrone.Core.Jobs;
|
using NzbDrone.Core.Jobs;
|
||||||
using NzbDrone.Core.Messaging.Events;
|
using NzbDrone.Core.Messaging.Events;
|
||||||
@ -13,8 +14,6 @@ public class TaskModule : SonarrRestModuleWithSignalR<TaskResource, ScheduledTas
|
|||||||
{
|
{
|
||||||
private readonly ITaskManager _taskManager;
|
private readonly ITaskManager _taskManager;
|
||||||
|
|
||||||
private static readonly Regex NameRegex = new Regex("(?<!^)[A-Z]", RegexOptions.Compiled);
|
|
||||||
|
|
||||||
public TaskModule(ITaskManager taskManager, IBroadcastSignalRMessage broadcastSignalRMessage)
|
public TaskModule(ITaskManager taskManager, IBroadcastSignalRMessage broadcastSignalRMessage)
|
||||||
: base(broadcastSignalRMessage, "system/task")
|
: base(broadcastSignalRMessage, "system/task")
|
||||||
{
|
{
|
||||||
@ -51,7 +50,7 @@ private static TaskResource ConvertToResource(ScheduledTask scheduledTask)
|
|||||||
return new TaskResource
|
return new TaskResource
|
||||||
{
|
{
|
||||||
Id = scheduledTask.Id,
|
Id = scheduledTask.Id,
|
||||||
Name = NameRegex.Replace(taskName, match => " " + match.Value),
|
Name = taskName.SplitCamelCase(),
|
||||||
TaskName = taskName,
|
TaskName = taskName,
|
||||||
Interval = scheduledTask.Interval,
|
Interval = scheduledTask.Interval,
|
||||||
LastExecution = scheduledTask.LastExecution,
|
LastExecution = scheduledTask.LastExecution,
|
||||||
|
Loading…
Reference in New Issue
Block a user