You've already forked lazarus-ccr
MultiThreadProcsLaz: fixed typo TaskProcedure, added experimental support for local procs
git-svn-id: https://svn.code.sf.net/p/lazarus-ccr/svn@936 8e941d3f-bd1b-0410-a28a-d453659cc2b4
This commit is contained in:
@ -29,6 +29,9 @@ uses ctypes;
|
||||
|
||||
function GetSystemThreadCount: integer;
|
||||
|
||||
procedure CallLocalProc(Func: pointer; Frame: Pointer; Param1: PtrInt;
|
||||
Param2, Param3: Pointer);inline;
|
||||
|
||||
implementation
|
||||
|
||||
{$IFDEF Linux}
|
||||
@ -80,11 +83,21 @@ end;
|
||||
begin
|
||||
Result:=sysconf(_SC_NPROCESSORS_ONLN);
|
||||
end;
|
||||
|
||||
{$ELSE}
|
||||
begin
|
||||
Result:=1;
|
||||
end;
|
||||
{$ENDIF}
|
||||
|
||||
procedure CallLocalProc(Func: pointer; Frame: Pointer; Param1: PtrInt;
|
||||
Param2, Param3: Pointer); inline;
|
||||
type
|
||||
PointerLocal = procedure(_EBP: Pointer; Param1: PtrInt;
|
||||
Param2, Param3: Pointer);
|
||||
begin
|
||||
PointerLocal(Func)(Frame, Param1, Param2, Param3);
|
||||
end;
|
||||
|
||||
end.
|
||||
|
||||
|
@ -90,6 +90,10 @@ type
|
||||
Item: TMultiThreadProcItem) of object;
|
||||
TMTProcedure = procedure(Index: PtrInt; Data: Pointer;
|
||||
Item: TMultiThreadProcItem);
|
||||
TMTLocalProc = record
|
||||
Proc: Pointer; // must be a local procedure of a procedure (not a method)
|
||||
Frame: Pointer;
|
||||
end;
|
||||
|
||||
{ TProcThreadGroup
|
||||
Each task creates a new group of threads.
|
||||
@ -107,20 +111,21 @@ type
|
||||
TProcThreadGroup = class
|
||||
private
|
||||
FEndIndex: PtrInt;
|
||||
FFirstRunningIndex: PtrInt;
|
||||
FLastRunningIndex: PtrInt;
|
||||
FStarterItem: TMultiThreadProcItem;
|
||||
FMaxThreads: PtrInt;
|
||||
FPool: TProcThreadPool;
|
||||
FStartIndex: PtrInt;
|
||||
FTaskData: Pointer;
|
||||
FNext, FPrev: TProcThreadGroup;
|
||||
FState: TMTPGroupState;
|
||||
FTaskMethod: TMTMethod;
|
||||
FFirstThread: TProcThread;
|
||||
FTaskProcdure: TMTProcedure;
|
||||
FThreadCount: PtrInt;
|
||||
FException: Exception;
|
||||
FFirstRunningIndex: PtrInt;
|
||||
FFirstThread: TProcThread;
|
||||
FLastRunningIndex: PtrInt;
|
||||
FMaxThreads: PtrInt;
|
||||
FNext, FPrev: TProcThreadGroup;
|
||||
FPool: TProcThreadPool;
|
||||
FStarterItem: TMultiThreadProcItem;
|
||||
FStartIndex: PtrInt;
|
||||
FState: TMTPGroupState;
|
||||
FTaskData: Pointer;
|
||||
FTaskLocalProc: TMTLocalProc;
|
||||
FTaskMethod: TMTMethod;
|
||||
FTaskProcedure: TMTProcedure;
|
||||
FThreadCount: PtrInt;
|
||||
procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline;
|
||||
procedure RemoveFromList(var First: TProcThreadGroup); inline;
|
||||
function NeedMoreThreads: boolean; inline;
|
||||
@ -141,7 +146,8 @@ type
|
||||
property LastRunningIndex: PtrInt read FLastRunningIndex; // last started
|
||||
property TaskData: Pointer read FTaskData;
|
||||
property TaskMethod: TMTMethod read FTaskMethod;
|
||||
property TaskProcdure: TMTProcedure read FTaskProcdure;
|
||||
property TaskProcedure: TMTProcedure read FTaskProcedure;
|
||||
property TaskLocalProcedure: TMTLocalProc read FTaskLocalProc;
|
||||
property MaxThreads: PtrInt read FMaxThreads;
|
||||
property StarterItem: TMultiThreadProcItem read FStarterItem;
|
||||
end;
|
||||
@ -149,6 +155,8 @@ type
|
||||
{ TLightWeightThreadPool
|
||||
Group 0 are the inactive threads }
|
||||
|
||||
{ TProcThreadPool }
|
||||
|
||||
TProcThreadPool = class
|
||||
private
|
||||
FMaxThreadCount: PtrInt;
|
||||
@ -163,7 +171,7 @@ type
|
||||
procedure SetMaxThreadCount(const AValue: PtrInt);
|
||||
procedure CleanTerminatedThreads;
|
||||
procedure DoParallelIntern(const AMethod: TMTMethod;
|
||||
const AProc: TMTProcedure;
|
||||
const AProc: TMTProcedure; const ALocalProc: TMTLocalProc;
|
||||
StartIndex, EndIndex: PtrInt;
|
||||
Data: Pointer = nil; MaxThreads: PtrInt = 0);
|
||||
public
|
||||
@ -178,6 +186,11 @@ type
|
||||
procedure DoParallel(const AProc: TMTProcedure;
|
||||
StartIndex, EndIndex: PtrInt;
|
||||
Data: Pointer = nil; MaxThreads: PtrInt = 0); inline;
|
||||
|
||||
// experimental
|
||||
procedure DoParallelLocalProc(const AProc: Pointer;
|
||||
StartIndex, EndIndex: PtrInt;
|
||||
Data: Pointer = nil; MaxThreads: PtrInt = 0); // do not make this inline!
|
||||
public
|
||||
property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount;
|
||||
property ThreadCount: PtrInt read FThreadCount;
|
||||
@ -186,6 +199,9 @@ type
|
||||
var
|
||||
ProcThreadPool: TProcThreadPool = nil;
|
||||
|
||||
const
|
||||
MTLocalProcNil: TMTLocalProc = (Proc: nil; Frame: nil);
|
||||
|
||||
implementation
|
||||
|
||||
{ TMultiThreadProcItem }
|
||||
@ -417,10 +433,12 @@ end;
|
||||
procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer;
|
||||
Item: TMultiThreadProcItem); inline;
|
||||
begin
|
||||
if Assigned(FTaskProcdure) then
|
||||
FTaskProcdure(Index,Data,Item)
|
||||
if Assigned(FTaskProcedure) then
|
||||
FTaskProcedure(Index,Data,Item)
|
||||
else if Assigned(FTaskMethod) then
|
||||
FTaskMethod(Index,Data,Item)
|
||||
else
|
||||
FTaskMethod(Index,Data,Item);
|
||||
CallLocalProc(FTaskLocalProc.Proc,FTaskLocalProc.Frame,Index,Data,Item);
|
||||
end;
|
||||
|
||||
procedure TProcThreadGroup.IndexComplete(Index: PtrInt);
|
||||
@ -638,22 +656,33 @@ procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod;
|
||||
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
|
||||
begin
|
||||
if not Assigned(AMethod) then exit;
|
||||
DoParallelIntern(AMethod,nil,StartIndex,EndIndex,Data,MaxThreads);
|
||||
DoParallelIntern(AMethod,nil,MTLocalProcNil,StartIndex,EndIndex,Data,MaxThreads);
|
||||
end;
|
||||
|
||||
procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure;
|
||||
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
|
||||
begin
|
||||
if not Assigned(AProc) then exit;
|
||||
DoParallelIntern(nil,AProc,StartIndex,EndIndex,Data,MaxThreads);
|
||||
DoParallelIntern(nil,AProc,MTLocalProcNil,StartIndex,EndIndex,Data,MaxThreads);
|
||||
end;
|
||||
|
||||
procedure TProcThreadPool.DoParallelLocalProc(const AProc: Pointer; StartIndex,
|
||||
EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
|
||||
var
|
||||
LocalProc: TMTLocalProc;
|
||||
begin
|
||||
if not Assigned(AProc) then exit;
|
||||
LocalProc.Proc:=AProc;
|
||||
LocalProc.Frame:=get_caller_frame(get_frame);
|
||||
DoParallelIntern(nil,nil,LocalProc,StartIndex,EndIndex,Data,MaxThreads);
|
||||
end;
|
||||
|
||||
procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod;
|
||||
const AProc: TMTProcedure;
|
||||
const AProc: TMTProcedure; const ALocalProc: TMTLocalProc;
|
||||
StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt);
|
||||
var
|
||||
Group: TProcThreadGroup;
|
||||
i: PtrInt;
|
||||
Index: PtrInt;
|
||||
AThread: TProcThread;
|
||||
NewThread: Boolean;
|
||||
Item: TMultiThreadProcItem;
|
||||
@ -668,9 +697,14 @@ begin
|
||||
// single threaded
|
||||
Item:=TMultiThreadProcItem.Create;
|
||||
try
|
||||
for i:=StartIndex to EndIndex do begin
|
||||
Item.FIndex:=i;
|
||||
AMethod(i,Data,Item);
|
||||
for Index:=StartIndex to EndIndex do begin
|
||||
Item.FIndex:=Index;
|
||||
if Assigned(AProc) then
|
||||
AProc(Index,Data,Item)
|
||||
else if Assigned(AMethod) then
|
||||
AMethod(Index,Data,Item)
|
||||
else
|
||||
CallLocalProc(ALocalProc.Proc,ALocalProc.Frame,Index,Data,Item);
|
||||
end;
|
||||
finally
|
||||
Item.Free;
|
||||
@ -683,7 +717,8 @@ begin
|
||||
Group.FPool:=Self;
|
||||
Group.FTaskData:=Data;
|
||||
Group.FTaskMethod:=AMethod;
|
||||
Group.FTaskProcdure:=AProc;
|
||||
Group.FTaskProcedure:=AProc;
|
||||
Group.FTaskLocalProc:=ALocalProc;
|
||||
Group.FStartIndex:=StartIndex;
|
||||
Group.FEndIndex:=EndIndex;
|
||||
Group.FFirstRunningIndex:=StartIndex;
|
||||
@ -727,25 +762,25 @@ begin
|
||||
end;
|
||||
|
||||
// run until no more Index left
|
||||
i:=StartIndex;
|
||||
Index:=StartIndex;
|
||||
repeat
|
||||
Group.FStarterItem.FIndex:=i;
|
||||
Group.Run(i,Data,Group.FStarterItem);
|
||||
Group.FStarterItem.FIndex:=Index;
|
||||
Group.Run(Index,Data,Group.FStarterItem);
|
||||
|
||||
EnterPoolCriticalSection;
|
||||
try
|
||||
Group.IndexComplete(i);
|
||||
Group.IndexComplete(Index);
|
||||
if (Group.FLastRunningIndex<Group.EndIndex) and (Group.FState<>mtpgsException)
|
||||
then begin
|
||||
inc(Group.FLastRunningIndex);
|
||||
i:=Group.FLastRunningIndex;
|
||||
Index:=Group.FLastRunningIndex;
|
||||
end else begin
|
||||
i:=StartIndex;
|
||||
Index:=StartIndex;
|
||||
end;
|
||||
finally
|
||||
LeavePoolCriticalSection;
|
||||
end;
|
||||
until i=StartIndex;
|
||||
until Index=StartIndex;
|
||||
finally
|
||||
// wait for Group to finish
|
||||
if Group.FFirstThread<>nil then begin
|
||||
@ -762,11 +797,11 @@ begin
|
||||
LeavePoolCriticalSection;
|
||||
end;
|
||||
// waiting with exponential spin lock
|
||||
i:=0;
|
||||
Index:=0;
|
||||
while Group.FFirstThread<>nil do begin
|
||||
sleep(i);
|
||||
i:=i*2+1;
|
||||
if i>30 then i:=30;
|
||||
sleep(Index);
|
||||
Index:=Index*2+1;
|
||||
if Index>30 then Index:=30;
|
||||
end;
|
||||
end;
|
||||
// remove group from pool
|
||||
|
@ -11,18 +11,19 @@
|
||||
<CompilerPath Value="$(CompPath)"/>
|
||||
</Other>
|
||||
</CompilerOptions>
|
||||
<Version Major="1" Release="1"/>
|
||||
<Files Count="3">
|
||||
<Item1>
|
||||
<Filename Value="mtprocs.pas"/>
|
||||
<UnitName Value="mtprocs"/>
|
||||
<UnitName Value="MTProcs"/>
|
||||
</Item1>
|
||||
<Item2>
|
||||
<Filename Value="mtputils.pas"/>
|
||||
<UnitName Value="mtputils"/>
|
||||
<UnitName Value="MTPUtils"/>
|
||||
</Item2>
|
||||
<Item3>
|
||||
<Filename Value="mtpcpu.pas"/>
|
||||
<UnitName Value="mtpcpu"/>
|
||||
<UnitName Value="MTPCPU"/>
|
||||
</Item3>
|
||||
</Files>
|
||||
<Type Value="RunAndDesignTime"/>
|
||||
@ -33,7 +34,8 @@
|
||||
</Item1>
|
||||
</RequiredPkgs>
|
||||
<UsageOptions>
|
||||
<UnitPath Value="$(PkgOutDir)"/>
|
||||
<CustomOptions Value="-dUseCThreads"/>
|
||||
<UnitPath Value="$(PkgOutDir)/"/>
|
||||
</UsageOptions>
|
||||
<PublishOptions>
|
||||
<Version Value="2"/>
|
||||
|
Reference in New Issue
Block a user