Files
lazarus-ccr/components/flashfiler/sourcelaz/ffllthrd.pas
2016-12-07 13:31:59 +00:00

767 lines
24 KiB
ObjectPascal

{*********************************************************}
{* FlashFiler: Server thread pool & thread classes *}
{*********************************************************}
(* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is TurboPower FlashFiler
*
* The Initial Developer of the Original Code is
* TurboPower Software
*
* Portions created by the Initial Developer are Copyright (C) 1996-2002
* the Initial Developer. All Rights Reserved.
*
* Contributor(s):
*
* ***** END LICENSE BLOCK ***** *)
{$I ffdefine.inc}
unit ffllthrd;
interface
uses
classes,
windows,
ffllBase, {!!.06}
ffllComp; {!!.06}
type
{ This is a type of procedure that may be passed to a thread pool for
processing. The thread pool grabs an available thread or instantiates
a new thread. It then passes the procedure to the thread and the thread
calls the procedure. aProcessCookie is whatever the calling object
wants it to be. }
TffThreadProcessEvent = procedure(const aProcessCookie: longInt) of object;
TffThreadPool = class; { forward declaration }
{ This type of thread is useful for work that must occur on a periodic
basis. This thread frees itself when terminated. }
TffTimerThread = class(TffThread)
protected { private }
FFrequency : DWord;
{-The number of milliseconds between each firing of the timer event. }
FTimerEvent : TffThreadProcessEvent;
{-The routine that is called when the "timer" fires. }
FTimerEventCookie : longInt;
{-The cookie passed to the FProcessEvent. }
FDieEvent : TffEvent;
{-Event raised when a thread is to die. }
protected
procedure Execute; override;
public
constructor Create(const aFrequency : DWord;
aTimerEvent : TffThreadProcessEvent;
const aTimerEventCookie : longInt;
const createSuspended : boolean); virtual;
{ Use this method to create an instance of the thread. Parameters:
- aFrequency is the number of milliseconds that must elapse before the
thread calls aProcessEvent.
- aTimerEvent is the method called when the timer fires.
- aTimerEventCookie is an optional value that is passed to aTimerEvent.
- CreateSuspended allows you to control when the thread starts.
If False then the thread starts immediately. If True then the thread
starts once you call the Resume method. }
destructor Destroy; override;
procedure DieDieDie;
{ Use this method to terminate the timer thread. }
property Frequency : DWord
read FFrequency write FFrequency;
{ The number of milliseconds between each firing of the timer event. }
end;
{ This is the base class for threads associated with pools. The pool's
Process method grabs an available thread or creates a new instance of
this class. It then calls the TffPooledThread.Process method. }
TffPooledThread = class(TffThread)
protected { private }
FDieEvent : TffEvent;
{-Event raised when a thread is to die. }
FProcessCookie : longInt;
{-The cookie passed to the Process method. Used by the Execute
method. }
FProcessEvent : TffThreadProcessEvent;
{-The callback passed to the Process method. Used by the Execute
method. }
FThreadEventHandles: Array[0..1] of THandle;
{-When a thread is created, it pauses in its execute method until it
receives one of two events:
1. Wake up and do some work.
2. Wake up and terminate.
This array stores these two event handles. }
FThreadPool : TffThreadPool;
{-The parent thread pool. }
FWorkEvent : TffEvent;
{-Event raised when a thread is to do work. }
protected
procedure Execute; override;
{ Calls the processEvent stored by the Process method.
Do not call this function directly. Instead, use the Process method. }
procedure ptReturnToPool;
{-Called by the execute method. When the thread has finished its work,
this method has the threadpool return this thread to the list of
inactive threads. If there are pending requests, the threadPool will
assign one to this thread instead of putting the thread back in the
inactive list. }
public
constructor Create(threadPool : TffThreadPool); virtual;
{ Use this method to create the thread and associate it with a thread
pool. }
destructor Destroy; override;
procedure DieDieDie;
{ Use this method to terminate the thread. }
procedure Process(aProcessEvent : TffThreadProcessEvent;
aProcessCookie: longInt);
{ This method is called by the thread pool to perform work. It saves
the process event and cookie then raises an event informing the
thread it has work to do. }
published
end;
{ This class is a generic mechanism for having work performed in a separate
thread. It maintains a pool of threads. It may be instructed to create
an initial number of threads upon startup and to never exceed a certain
number of threads within the pool. It maintains the status of each
thread, placing them in an active or inactive list.
Any type of object may have work performed through one of the pool's thread
by supplying a callback function and cookie (optional) to the pool's
ProcessThreaded method. }
TffThreadPool = class(TffLoggableComponent) {!!.06}
private
FActive : TffList;
{-List of acquired threads. When a thread becomes inactive it is moved
to FInactive. }
FInactive : TffList;
{-List of available threads. When a thread is acquired, it moves to the
FActive list. }
FInitialCount : integer;
{-The maximum number of threads that can be created by the pool. }
FInitialized : boolean;
{-Set to True when the initial threads have been created for the thread
pool. }
FMaxCount : integer;
{-The maximum number of threads to be created by the pool. }
FPendingQueue : TffThreadQueue;
{-Queue of pending requests. Requests wind up here when a thread
is not available to process the request. }
FLock : TffPadlock;
{-Controls access to the threads. }
FSkipInitial : Boolean;
{-Used by the EngineManager expert to keep the pool from creating threads
when InitialCount is set}
protected
function thpGetActiveCount : integer;
{-Return total # of active thread. }
function thpGetFreeCount : integer;
{-Return total # of free thread slots. In other words, the maximum
number of threads minus the total # of active and inactive threads. }
function thpGetInactiveCount : integer;
{-Return total # of inactive threads.}
function thpGetThreadFromPool : TffPooledThread;
{-Used to obtain a thread from the inactive pool. If no thread is
available then this method returns nil. If a thread is available,
the thread is moved from the inactive list to the active list. }
procedure thpPutInQueue(aProcessEvent : TffThreadProcessEvent;
aProcessCookie: longInt);
{-Used to place a request in queue when a thread is not available to
process the request. The request will be picked out of the queue by
the next free thread. }
procedure thpReturnThreadToPool(aThread : TffPooledThread);
{-Called by a thread when it has finished processing. If any requests
are in queue then this method has the newly-available thread process
the request. Otherwise, this method moves the thread from the active
list to the inactive list. }
procedure thpSetInitialCount(const aCount : integer);
{-Called when the initial thread count is set. }
procedure thpSetMaxCount(const aCount : integer);
{-Called when the max thread count is set. }
property SkipInitial : Boolean
read FSkipInitial write FSkipInitial;
{-Used by the EngineManager expert to keep the pool from creating threads
when InitialCount is set}
public
constructor Create(aOwner : TComponent); override;
destructor Destroy; override;
procedure Flush(NumToRetain : integer);
{ Use this method to flush inactive threads from the pool. NumToRetain
is the number of inactive threads to retain in the pool. Active threads
are unaffected by this method. }
procedure ProcessThreaded(aProcessEvent : TffThreadProcessEvent;
aProcessCookie: longInt);
{ Use this method to have a worker thread process a message. The worker
thread calls the specified process event, passing it the specified
process cookie. If a worker thread is not immediately available, this
method will add the message to an internal queue. The next thread that
becomes available will pick up the request from the queue and process
the request. }
property ActiveCount : integer read thpGetActiveCount;
{ The total number of active threads. }
property FreeCount : integer read thpGetFreeCount;
{ The total number of thread slots that are unfilled. Usually
calculated as MaxCount - ActiveCount - InactiveCount. }
property InactiveCount : integer read thpGetInactiveCount;
{ The total number of inactive threads. Does not include free thread
slots that do not contain a thread. }
published
property InitialCount : integer
read FInitialCount write thpSetInitialCount default 5;
{ The initial number of threads to be preloaded by the pool. }
property MaxCount : integer
read FMaxCount write thpSetMaxCount default 16;
{ The maximum number of threads that can be created by the pool. }
end;
{ This type is used to store pending requests in the TffThreadPool. }
TffThreadRequestItem = class(TffSelfListItem)
protected
FProcessCookie : longInt;
FProcessEvent : TffThreadProcessEvent;
public
constructor Create(anEvent : TffThreadProcessEvent;
aCookie : longInt);
property ProcessCookie : longInt read FProcessCookie;
property ProcessEvent : TffThreadProcessEvent read FProcessEvent;
end;
implementation
uses
sysUtils, {!!.06}
// ffllcomp, {Deleted !!.06}
ffllexcp;
{$I ffconst.inc}
{$I ffllscst.inc}
{===TffTimerThread===================================================}
constructor TffTimerThread.Create(const aFrequency : DWord;
aTimerEvent : TffThreadProcessEvent;
const aTimerEventCookie : longInt;
const createSuspended : boolean);
begin
{ Requirement: aTimerEvent must be assigned. }
if not assigned(aTimerEvent) then
RaiseSCErrorCodeFmt(ffsce_ParameterRequired,
['aTimerEvent', ClassName + '.constructor']);
{ Make sure important variables set before the thread is actually started in
the inherited Create. }
FDieEvent := TffEvent.Create;
FFrequency := aFrequency;
FTimerEvent := aTimerEvent;
FTimerEventCookie := aTimerEventCookie;
FreeOnTerminate := False;
inherited Create(createSuspended);
end;
{--------}
destructor TffTimerThread.Destroy;
begin
FDieEvent.Free;
inherited Destroy;
end;
{--------}
procedure TffTimerThread.DieDieDie;
begin
Terminate;
FDieEvent.SignalEvent;
end;
{--------}
procedure TffTimerThread.Execute;
var
aResult : DWORD;
begin
if Terminated then exit;
repeat
aResult := FDieEvent.WaitForQuietly(FFrequency);
if aResult = WAIT_TIMEOUT then
FTimerEvent(FTimerEventCookie)
else
Terminate;
until Terminated;
end;
{====================================================================}
{===TffPooledThread==================================================}
constructor TffPooledThread.Create(threadPool : TffThreadPool);
{ Use this method to create the thread and associate it with a thread
pool. }
begin
inherited Create(False);
FDieEvent := TffEvent.Create;
FProcessCookie := -1;
FProcessEvent := nil;
FThreadPool := threadPool;
FWorkEvent := TffEvent.Create;
FThreadEventHandles[0] := FWorkEvent.Handle;
FThreadEventHandles[1] := FDieEvent.Handle;
FreeOnTerminate := False; { Freed in TffThreadpool.destroy }
end;
{--------}
destructor TffPooledThread.Destroy;
begin
FDieEvent.Free;
FWorkEvent.Free;
inherited Destroy;
end;
{--------}
procedure TffPooledThread.DieDieDie;
begin
Terminate;
FDieEvent.SignalEvent;
end;
{--------}
procedure TffPooledThread.Execute;
var
WaitResult : DWORD;
begin
repeat
{ Wait for something to do or until we are killed. }
WaitResult := WaitForMultipleObjects(2, @FThreadEventHandles,
false, ffcl_INFINITE); {!!.06}
if (WaitResult = WAIT_OBJECT_0) then begin
{ Thread has work to do. }
{Begin !!.06}
try
if assigned(FProcessEvent) then
FProcessEvent(FProcessCookie);
except
on E:Exception do
FThreadPool.lcLog('Exception caught in TffPooledThread.Execute: ' +
E.Message);
end;
{End !!.06}
if not Terminated then
ptReturnToPool;
end;
until Terminated;
end;
{--------}
procedure TffPooledThread.Process(aProcessEvent : TffThreadProcessEvent;
aProcessCookie: longInt);
{ This method is called by the thread pool to perform work. It saves
the process event and cookie then resumes the thread. }
begin
FProcessEvent := aProcessEvent;
FProcessCookie := aProcessCookie;
FWorkEvent.SignalEvent;
end;
{--------}
procedure TffPooledThread.ptReturnToPool;
begin
FThreadPool.thpReturnThreadToPool(Self);
end;
{====================================================================}
{===TffThreadPool====================================================}
constructor TffThreadPool.Create(aOwner : TComponent);
begin
inherited Create(aOwner);
FLock := TffPadlock.Create;
FActive := TffList.Create;
FActive.Sorted := False;
FInactive := TffList.Create;
FInactive.Sorted := False;
FInitialCount := 5;
FInitialized := False;
FMaxCount := 16;
FPendingQueue := TffThreadQueue.Create;
FSkipInitial := False;
end;
{--------}
destructor TffThreadPool.Destroy;
var
anIndex : longInt;
aThread : TffPooledThread;
HandleList : TffHandleList; { list of thread handles }
PHandleArray : pointer;
begin
FFNotifyDependents(ffn_Destroy); {!!.11}
FLock.Lock;
try
HandleList := TffHandleList.Create;
try
if assigned(FActive) then begin
{ Allocate memory for the array of thread handles. }
HandleList.Capacity := FActive.Count;
for anIndex := pred(FActive.Count) downto 0 do begin
aThread := TffPooledThread(TffIntListItem(FActive[anIndex]).KeyAsInt);
HandleList.Append(aThread.Handle);
aThread.DieDieDie;
end;
end;
if assigned(FInactive) then begin
{ Add more memory as needed to array of thread handles. }
HandleList.Capacity := HandleList.Capacity + FInactive.Count;
for anIndex := pred(FInactive.Count) downto 0 do begin
aThread := TffPooledThread(TffIntListItem(FInactive[anIndex]).KeyAsInt);
HandleList.Append(aThread.Handle);
aThread.DieDieDie;
end;
end;
{ Wait for the threads to terminate. }
PHandleArray := HandleList.InternalAddress;
WaitForMultipleObjects(HandleList.Count, pHandleArray, true, 2000);
{ SPW - 7/3/2000 - Note: I tried using the MsgWaitForMultipleObjects (as shown
below) but after awhile it would wait the entire 5 seconds even though all
threads had terminated. Using WaitForMultipleObjects does not appear to
have that kind of problem.
MsgWaitForMultipleObjects(HandleIndex, pHandleArray^, true,
2000, QS_ALLINPUT); }
finally
{ Explicitly remove the handles so that they are not closed before the
thread has had a chance to close the handle. }
HandleList.RemoveAll;
HandleList.Free;
end;
{ Free the threads. }
if assigned(FActive) then
for anIndex := pred(FActive.Count) downto 0 do
TffPooledThread(TffIntListItem(FActive[anIndex]).KeyAsInt).Free;
if assigned(FInactive) then
for anIndex := pred(FInactive.Count) downto 0 do
TffPooledThread(TffIntListItem(FInactive[anIndex]).KeyAsInt).Free;
FPendingQueue.Free;
finally
FActive.Free;
FInactive.Free;
FLock.Unlock;
FLock.Free;
end;
inherited Destroy;
end;
{--------}
procedure TffThreadPool.Flush(NumToRetain : integer);
var
anIndex : integer;
aThread : TffPooledThread;
begin
FLock.Lock;
try
for anIndex := pred(FInactive.Count) downto NumToRetain do begin
aThread := TffPooledThread(TffIntListItem(FInactive[anIndex]).KeyAsInt);
aThread.DieDieDie;
FInactive.DeleteAt(anIndex);
end;
finally
FLock.Unlock;
end;
end;
{--------}
procedure TffThreadPool.ProcessThreaded(aProcessEvent : TffThreadProcessEvent;
aProcessCookie: longInt);
var
aThread : TffPooledThread;
begin
{ Get an available thread. }
aThread := thpGetThreadFromPool;
{ If one is available then have it process the request. }
if assigned(aThread) then
aThread.Process(aProcessEvent, aProcessCookie)
else
{ Otherwise put the request in queue for processing by
the next free thread. }
thpPutInQueue(aProcessEvent, aProcessCookie);
end;
{--------}
function TffThreadPool.thpGetActiveCount : integer;
begin
FLock.Lock;
try
Result := FActive.Count;
finally
FLock.Unlock;
end;
end;
{--------}
function TffThreadPool.thpGetFreeCount : integer;
begin
{ free count := max - (active count + inactive count) }
{ Note there is a small chance for inaccuracy. It is totally
possible that a new thread is activated in between our getting
the active threads count and getting the inactive threads count.
Just in case this question is in your mind, we should only lock
one list at a time. Otherwise we run the risk of deadlock. }
FLock.Lock;
try
Result := FMaxCount - FActive.Count - FInactive.Count;
finally
FLock.Unlock;
end;
end;
{--------}
function TffThreadPool.thpGetInactiveCount : integer;
begin
FLock.Lock;
try
Result := FInactive.Count;
finally
FLock.Unlock;
end;
end;
{--------}
function TffThreadPool.thpGetThreadFromPool : TffPooledThread;
var
aListItem : TffIntListItem;
anIndex : longInt;
begin
Result := nil;
aListItem := nil;
FLock.Lock;
try
{ Is an inactive thread available? }
anIndex := pred(FInactive.Count);
if anIndex >= 0 then begin
{ Yes. Grab the last one and remove it from the inactive list. }
aListItem := TffIntListItem(FInactive[anIndex]);
FInactive.RemoveAt(anIndex);
Result := TffPooledThread(aListItem.KeyAsInt);
end;
{ If we didn't have an inactive thread, see if we can add a new thread.
Note: We do this outside the above try..finally block because GetFreeCount
must obtain read access to both thread lists. }
if not assigned(Result) then
if thpGetFreeCount > 0 then begin
Result := TffPooledThread.Create(Self);
aListItem := TffIntListItem.Create(longInt(Result));
end;
{ Did we obtain a thread? }
if assigned(aListItem) then
{ Yes. Add it to the active list. }
FActive.Insert(aListItem);
finally
FLock.Unlock;
end;
end;
{--------}
procedure TffThreadPool.thpPutInQueue(aProcessEvent : TffThreadProcessEvent;
aProcessCookie: longInt);
var
anItem : TffThreadRequestItem;
begin
anItem := TffThreadRequestItem.Create(aProcessEvent, aProcessCookie);
with FPendingQueue.BeginWrite do
try
Enqueue(anItem);
finally
EndWrite;
end;
end;
{--------}
procedure TffThreadPool.thpReturnThreadToPool(aThread : TffPooledThread);
var
aCookie: longInt;
anEvent : TffThreadProcessEvent;
anItem : TffThreadRequestItem;
aListItem : TffIntListItem;
PendingRequest : boolean;
begin
anEvent := nil;
aCookie := -1;
{ Any pending requests? Note that we are assuming some minor risk here.
The pending queue should only have something in it if all threads
were busy. We can afford to check the queue's count without worrying
about thread-safeness because somebody will pick up the count sooner
or later. }
PendingRequest := False;
if FPendingQueue.Count > 0 then
with FPendingQueue.BeginWrite do
try
PendingRequest := (Count > 0);
{ If we have a pending request then get it. }
if PendingRequest then begin
anItem := TffThreadRequestItem(FPendingQueue.Dequeue);
anEvent := anItem.ProcessEvent;
aCookie := anItem.ProcessCookie;
anItem.Free;
end;
finally
EndWrite;
end;
{ If we had a pending request then handle it. }
if PendingRequest then
aThread.Process(anEvent, aCookie)
else begin
{ Otherwise move this thread to the inactive threads list. }
FLock.Lock;
try
aListItem := TffIntListItem(FActive[FActive.Index(longInt(aThread))]);
FActive.Remove(longInt(aThread));
FInactive.Insert(aListItem);
finally
FLock.Unlock;
end;
end;
end;
{--------}
procedure TffThreadPool.thpSetInitialCount(const aCount : integer);
var
anIndex : integer;
anItem : TffIntListItem;
aThread : TffPooledThread;
begin
if not (csDesigning in ComponentState) and (not FInitialized) and
(not FSkipInitial) then begin
FLock.Lock;
try
{ Create the initial set of threads. }
for anIndex := 1 to aCount do begin
aThread := TffPooledThread.Create(Self);
anItem := TffIntListItem.Create(longInt(aThread));
FInactive.Insert(anItem);
end;
finally
FLock.Unlock;
end;
FInitialized := True;
end;
FInitialCount := aCount;
end;
{--------}
procedure TffThreadPool.thpSetMaxCount(const aCount : integer);
var
anIndex : integer;
aThread : TffPooledThread;
currCount : integer;
delCount : integer;
begin
if not (csDesigning in ComponentState) and (not FSkipInitial) then begin
{ If the maximum is now lower than our initial count then get rid
of some threads. }
currCount := FMaxCount - thpGetFreeCount;
if currCount > aCount then begin
{ Figure out how many threads need to be deleted. }
delCount := currCount - aCount;
FLock.Lock;
try
for anIndex := 1 to delCount do
{ We have to check the count. It is possible we need to
delete more threads than are in the inactive list. Because
we have the inactive list locked, any active threads that finish
can't add themselves back to the inactive list. So we will delete
what we can. }
if FInactive.Count > 0 then begin
aThread := TffPooledThread(TffIntListItem(FInactive[0]).KeyAsInt);
aThread.DieDieDie;
FInactive.DeleteAt(0);
end
else
break;
finally
FLock.Unlock;
end;
end;
end;
FMaxCount := aCount;
end;
{====================================================================}
{===TffThreadRequestItem=============================================}
constructor TffThreadRequestItem.Create(anEvent : TffThreadProcessEvent;
aCookie : longInt);
begin
inherited Create;
FProcessEvent := anEvent;
FProcessCookie := aCookie;
end;
{====================================================================}
end.