diff --git a/components/multithreadprocs/examples/testmtp1.lpi b/components/multithreadprocs/examples/testmtp1.lpi new file mode 100644 index 000000000..094fa653b --- /dev/null +++ b/components/multithreadprocs/examples/testmtp1.lpi @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/components/multithreadprocs/examples/testmtp1.lpr b/components/multithreadprocs/examples/testmtp1.lpr new file mode 100644 index 000000000..d2762df63 --- /dev/null +++ b/components/multithreadprocs/examples/testmtp1.lpr @@ -0,0 +1,367 @@ +program TestMTP1; + +{$mode objfpc}{$H+} + +uses + {$IFDEF UNIX} + cthreads, cmem, + {$ENDIF} + Math, SysUtils, Classes, MTProcs, MTPUtils, MultiThreadProcsLaz; + +type + + { TTestItem } + + TTestItem = class + private + FIndex: int64; + public + property Index: int64 read FIndex; + constructor Create(NewIndex: int64); + end; + + { TTests } + + TTests = class + public + procedure Work(Seconds: integer); + + // RTLeventSetEvent, RTLeventWaitFor + procedure TestRTLevent_Set_WaitFor; + + // single thread test + procedure TestSingleThread; + procedure MTPLoop_TestSingleThread(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + // two threads test: run once + procedure TestTwoThreads1; + procedure MTPLoop_TestTwoThreads1(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + // 0 runs two seconds, + // 1 runs a second then waits for 0 then runs a second + // 2 runs a second then waits for 1 + // 3 waits for 0 + // 4 waits for 1 + // 5 waits for 2 + procedure TestMTPWaitForIndex; + procedure MTPLoop_TestMTPWaitForIndex(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + // two threads test: various run times + procedure TestMTPTwoThreads2; + procedure MTPLoop_TestTwoThreads2(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + // test exception in starter thread + procedure TestMTPExceptionInStarterThread; + procedure MTPLoop_TestExceptionInStarterThread(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + // test exception in helper thread + procedure TestMTPExceptionInHelperThread; + procedure MTPLoop_TestExceptionInHelperThread(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + // test parallel sort + procedure TestMTPSort; + procedure MTPLoop_TestDoubleMTPSort(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + end; + +{ TTestItem } + +constructor TTestItem.Create(NewIndex: int64); +begin + FIndex:=NewIndex; +end; + +{ TTests } + +procedure TTests.Work(Seconds: integer); +var + Start: TDateTime; +begin + Start:=Now; + while (Now-Start)*864002 then begin + WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' ERROR: waited for '+IntToStr(OtherIndex)+' failed: OtherState='+IntToStr(PInteger(Data)[OtherIndex])); + end; + end; + +begin + WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' START'); + if PInteger(Data)[Index]<>0 then begin + WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' ERROR: IndexState='+IntToStr(PInteger(Data)[Index])); + end; + PInteger(Data)[Index]:=1; + case Index of + 0: Work(2); + 1:begin + Work(1); + WaitFor(0); + Work(1); + end; + 2:begin + Work(1); + WaitFor(1); + end; + 3:begin + WaitFor(0); + end; + 4:begin + WaitFor(1); + end; + 5:begin + WaitFor(2); + end; + end; + WriteLn('TTests.MTPLoop_TestMTPWaitForIndex Index='+IntToStr(Index)+' END'); + PInteger(Data)[Index]:=2; +end; + +procedure TTests.TestMTPTwoThreads2; +begin + WriteLn('TTests.TestMTPTwoThreads1 START'); + ProcThreadPool.DoParallel(@MTPLoop_TestTwoThreads2,1,6,nil,2); + WriteLn('TTests.TestMTPTwoThreads1 END'); +end; + +procedure TTests.MTPLoop_TestTwoThreads2(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); +var + i: Integer; +begin + for i:=1 to (Index mod 3)+1 do begin + WriteLn('TTests.MTPLoop_TestTwoThreads1 Index=',Index,' i=',i,' ID=',PtrUint(GetThreadID)); + Work(1); + end; +end; + +type + TMyException = class(Exception); + +procedure TTests.TestMTPExceptionInStarterThread; +var + IndexStates: PInteger; +begin + WriteLn('TTests.TestMTPExceptionInStarterThread START'); + ProcThreadPool.MaxThreadCount:=8; + IndexStates:=nil; + GetMem(IndexStates,SizeOf(Integer)*10); + FillByte(IndexStates^,SizeOf(Integer)*10,0); + try + ProcThreadPool.DoParallel(@MTPLoop_TestExceptionInStarterThread,1,3,IndexStates,2); + except + on E: Exception do begin + WriteLn('TTests.TestMTPExceptionInHelperThread E.ClassName=',E.ClassName,' E.Message=',E.Message); + end; + end; + FreeMem(IndexStates); + WriteLn('TTests.TestMTPExceptionInStarterThread END'); +end; + +procedure TTests.MTPLoop_TestExceptionInStarterThread(Index: PtrInt; + Data: Pointer; Item: TMultiThreadProcItem); +begin + WriteLn('TTests.MTPLoop_TestExceptionInStarterThread START Index='+IntToStr(Index)); + if PInteger(Data)[Index]<>0 then + WriteLn('TTests.MTPLoop_TestExceptionInStarterThread Index='+IntToStr(Index)+' ERROR: IndexState='+IntToStr(PInteger(Data)[Index])); + PInteger(Data)[Index]:=1; + case Index of + 1: + begin + // Main Thread + Work(1); + WriteLn('TTests.MTPLoop_TestExceptionInStarterThread raising exception in Index='+IntToStr(Index)+' ...'); + raise Exception.Create('Exception in starter thread'); + end; + else + Work(Index); + end; + PInteger(Data)[Index]:=2; + WriteLn('TTests.MTPLoop_TestExceptionInStarterThread END Index='+IntToStr(Index)); +end; + +procedure TTests.TestMTPExceptionInHelperThread; +var + IndexStates: PInteger; +begin + WriteLn('TTests.TestMTPExceptionInHelperThread START'); + ProcThreadPool.MaxThreadCount:=8; + IndexStates:=nil; + GetMem(IndexStates,SizeOf(Integer)*10); + FillByte(IndexStates^,SizeOf(Integer)*10,0); + try + ProcThreadPool.DoParallel(@MTPLoop_TestExceptionInHelperThread,1,3,IndexStates,2); + except + on E: Exception do begin + WriteLn('TTests.TestMTPExceptionInHelperThread E.ClassName=',E.ClassName,' E.Message=',E.Message); + end; + end; + FreeMem(IndexStates); + WriteLn('TTests.TestMTPExceptionInHelperThread END'); +end; + +procedure TTests.MTPLoop_TestExceptionInHelperThread(Index: PtrInt; + Data: Pointer; Item: TMultiThreadProcItem); +begin + WriteLn('TTests.MTPLoop_TestExceptionInHelperThread START Index='+IntToStr(Index)); + if PInteger(Data)[Index]<>0 then + WriteLn('TTests.MTPLoop_TestExceptionInHelperThread Index='+IntToStr(Index)+' ERROR: IndexState='+IntToStr(PInteger(Data)[Index])); + PInteger(Data)[Index]:=1; + case Index of + 2: + begin + // Helper Thread 2 + Work(1); + WriteLn('TTests.MTPLoop_TestExceptionInHelperThread raising exception in Index='+IntToStr(Index)+' ...'); + raise TMyException.Create('Exception in helper thread'); + end; + else + Work(Index+1); + end; + PInteger(Data)[Index]:=2; + WriteLn('TTests.MTPLoop_TestExceptionInHelperThread END Index='+IntToStr(Index)); +end; + +function CompareTestItems(Data1, Data2: Pointer): integer; +begin + if TTestItem(Data1).Index>TTestItem(Data2).Index then + Result:=1 + else if TTestItem(Data1).Index0 then raise Exception.Create('not sorted'); + + for i:=0 to List.Count-1 do + TObject(List[i]).Free; + List.Free; +end; + +var + Tests: TTests; +begin + writeln('threads=',ProcThreadPool.MaxThreadCount); + ProcThreadPool.MaxThreadCount:=8; + Tests:=TTests.Create; + //Tests.Test1; + //Tests.Test2; + //Tests.TestTwoThreads2; + //Tests.TestRTLevent_Set_WaitFor; + //Tests.TestMTPWaitForIndex; + //Tests.TestMTPExceptionInStarterThread; + Tests.TestMTPExceptionInHelperThread; + //Tests.TestMTPSort; + Tests.Free; +end. + diff --git a/components/multithreadprocs/mtpcpu.pas b/components/multithreadprocs/mtpcpu.pas new file mode 100644 index 000000000..93e864e28 --- /dev/null +++ b/components/multithreadprocs/mtpcpu.pas @@ -0,0 +1,90 @@ +{ System depending code for light weight threads. + + This file is part of the Free Pascal run time library. + + Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org + + See the file COPYING.FPC, included in this distribution, + for details about the copyright. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + + **********************************************************************} +unit MTPCPU; + +{$mode objfpc}{$H+} + +interface + +{$IF defined(windows)} +uses Windows; +{$ELSEIF defined(freebsd) or defined(darwin)} +uses ctypes, sysctl; +{$ELSEIF defined(linux)} +{$linklib c} +uses ctypes; +{$ENDIF} + +function GetSystemThreadCount: integer; + +implementation + +{$IFDEF Linux} +const _SC_NPROCESSORS_ONLN = 83; +function sysconf(i: cint): clong; cdecl; external name 'sysconf'; +{$ENDIF} + +function GetSystemThreadCount: integer; +// returns a good default for the number of threads on this system +{$IF defined(windows)} +//returns total number of processors available to system including logical hyperthreaded processors +var + i: Integer; + ProcessAffinityMask, SystemAffinityMask: DWORD; + Mask: DWORD; + SystemInfo: SYSTEM_INFO; +begin + if GetProcessAffinityMask(GetCurrentProcess, ProcessAffinityMask, SystemAffinityMask) + then begin + Result := 0; + for i := 0 to 31 do begin + Mask := 1 shl i; + if (ProcessAffinityMask and Mask)<>0 then + inc(Result); + end; + end else begin + //can't get the affinity mask so we just report the total number of processors + GetSystemInfo(SystemInfo); + Result := SystemInfo.dwNumberOfProcessors; + end; +end; +{$ELSEIF defined(UNTESTEDsolaris)} + begin + t = sysconf(_SC_NPROC_ONLN); + end; +{$ELSEIF defined(freebsd) or defined(darwin)} +var + mib: array[0..1] of cint; + len: cint; + t: cint; +begin + mib[0] := CTL_HW; + mib[1] := HW_NCPU; + len := sizeof(t); + fpsysctl(pchar(@mib), 2, @t, @len, Nil, 0); + Result:=t; +end; +{$ELSEIF defined(linux)} + begin + Result:=sysconf(_SC_NPROCESSORS_ONLN); + end; +{$ELSE} + begin + Result:=1; + end; +{$ENDIF} + +end. + diff --git a/components/multithreadprocs/mtprocs.pas b/components/multithreadprocs/mtprocs.pas new file mode 100644 index 000000000..97bed85b0 --- /dev/null +++ b/components/multithreadprocs/mtprocs.pas @@ -0,0 +1,796 @@ +{ Unit for light weight threads. + + This file is part of the Free Pascal run time library. + + Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org + + See the file COPYING.FPC, included in this distribution, + for details about the copyright. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + + **********************************************************************} +{ + Abstract: + Light weight threads. + This unit provides methods to easily run a procedure/method with several + threads at once. +} +unit MTProcs; + +{$mode objfpc}{$H+} + +{$inline on} + +interface + +uses + Classes, SysUtils, MTPCPU; + +type + TProcThreadGroup = class; + TProcThreadPool = class; + + { TMultiThreadProcItem } + + TMTPThreadState = ( + mtptsNone, + mtptsActive, + mtptsWaitingForIndex, + mtptsWaitingFailed, + mtptsInactive, + mtptsTerminated + ); + + TMultiThreadProcItem = class + private + FGroup: TProcThreadGroup; + FIndex: PtrInt; + FWaitingForIndexEnd: PtrInt; + FWaitingForIndexStart: PtrInt; + fWaitForPool: PRTLEvent; + FState: TMTPThreadState; + public + destructor Destroy; override; + function WaitForIndexRange(StartIndex, EndIndex: PtrInt): boolean; + function WaitForIndex(Index: PtrInt): boolean; inline; + property Index: PtrInt read FIndex; + property Group: TProcThreadGroup read FGroup; + property WaitingForIndexStart: PtrInt read FWaitingForIndexStart; + property WaitingForIndexEnd: PtrInt read FWaitingForIndexEnd; + end; + + { TProcThread } + + TMTPThreadList = ( + mtptlPool, + mtptlGroup + ); + + TProcThread = class(TThread) + private + FItem: TMultiThreadProcItem; + FNext, FPrev: array[TMTPThreadList] of TProcThread; + procedure AddToList(var First: TProcThread; ListType: TMTPThreadList); inline; + procedure RemoveFromList(var First: TProcThread; ListType: TMTPThreadList); inline; + procedure Terminating(aPool: TProcThreadPool; E: Exception); + public + constructor Create; + destructor Destroy; override; + procedure Execute; override; + property Item: TMultiThreadProcItem read FItem; + end; + + TMTMethod = procedure(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem) of object; + TMTProcedure = procedure(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + { TProcThreadGroup + Each task creates a new group of threads. + A group can either need more threads or it has finished and waits for its + threads to end. + The thread that created the group is not in the list FFirstThread. } + + TMTPGroupState = ( + mtpgsNone, + mtpgsNeedThreads, // the groups waiting for more threads to help + mtpgsFinishing, // the groups waiting for its threads to finish + mtpgsException // there was an exception => close asap + ); + + TProcThreadGroup = class + private + FEndIndex: PtrInt; + FFirstRunningIndex: PtrInt; + FLastRunningIndex: PtrInt; + FStarterItem: TMultiThreadProcItem; + FMaxThreads: PtrInt; + FPool: TProcThreadPool; + FStartIndex: PtrInt; + FTaskData: Pointer; + FNext, FPrev: TProcThreadGroup; + FState: TMTPGroupState; + FTaskMethod: TMTMethod; + FFirstThread: TProcThread; + FTaskProcdure: TMTProcedure; + FThreadCount: PtrInt; + FException: Exception; + procedure AddToList(var First: TProcThreadGroup; ListType: TMTPGroupState); inline; + procedure RemoveFromList(var First: TProcThreadGroup); inline; + function NeedMoreThreads: boolean; inline; + procedure AddThread(AThread: TProcThread); + procedure RemoveThread(AThread: TProcThread); inline; + procedure Run(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); inline; + procedure IndexComplete(Index: PtrInt); + procedure WakeThreadsWaitingForIndex; + function HasFinishedIndex(aStartIndex, aEndIndex: PtrInt): boolean; + procedure EnterExceptionState(E: Exception); + public + constructor Create; + destructor Destroy; override; + property Pool: TProcThreadPool read FPool; + property StartIndex: PtrInt read FStartIndex; + property EndIndex: PtrInt read FEndIndex; + property FirstRunningIndex: PtrInt read FFirstRunningIndex; // first started + property LastRunningIndex: PtrInt read FLastRunningIndex; // last started + property TaskData: Pointer read FTaskData; + property TaskMethod: TMTMethod read FTaskMethod; + property TaskProcdure: TMTProcedure read FTaskProcdure; + property MaxThreads: PtrInt read FMaxThreads; + property StarterItem: TMultiThreadProcItem read FStarterItem; + end; + + { TLightWeightThreadPool + Group 0 are the inactive threads } + + TProcThreadPool = class + private + FMaxThreadCount: PtrInt; + FThreadCount: PtrInt; + FFirstInactiveThread: TProcThread; + FFirstActiveThread: TProcThread; + FFirstTerminatedThread: TProcThread; + FFirstGroupNeedThreads: TProcThreadGroup; + FFirstGroupFinishing: TProcThreadGroup; + FCritSection: TRTLCriticalSection; + FDestroying: boolean; + procedure SetMaxThreadCount(const AValue: PtrInt); + procedure CleanTerminatedThreads; + procedure DoParallelIntern(const AMethod: TMTMethod; + const AProc: TMTProcedure; + StartIndex, EndIndex: PtrInt; + Data: Pointer = nil; MaxThreads: PtrInt = 0); + public + constructor Create; + destructor Destroy; override; + procedure EnterPoolCriticalSection; inline; + procedure LeavePoolCriticalSection; inline; + + procedure DoParallel(const AMethod: TMTMethod; + StartIndex, EndIndex: PtrInt; + Data: Pointer = nil; MaxThreads: PtrInt = 0); inline; + procedure DoParallel(const AProc: TMTProcedure; + StartIndex, EndIndex: PtrInt; + Data: Pointer = nil; MaxThreads: PtrInt = 0); inline; + public + property MaxThreadCount: PtrInt read FMaxThreadCount write SetMaxThreadCount; + property ThreadCount: PtrInt read FThreadCount; + end; + +var + ProcThreadPool: TProcThreadPool = nil; + +implementation + +{ TMultiThreadProcItem } + +destructor TMultiThreadProcItem.Destroy; +begin + if fWaitForPool<>nil then begin + RTLeventdestroy(fWaitForPool); + fWaitForPool:=nil; + end; + inherited Destroy; +end; + +function TMultiThreadProcItem.WaitForIndexRange( + StartIndex, EndIndex: PtrInt): boolean; +var + aPool: TProcThreadPool; +begin + //WriteLn('TLightWeightThreadItem.WaitForIndexRange START Index='+IntToStr(Index)+' StartIndex='+IntToStr(StartIndex)+' EndIndex='+IntToStr(EndIndex)); + if (EndIndex>=Index) then exit(false); + if EndIndexnil then + FNext[ListType].FPrev[ListType]:=Self; + First:=Self; +end; + +procedure TProcThread.RemoveFromList(var First: TProcThread; + ListType: TMTPThreadList); +begin + if First=Self then + First:=FNext[ListType]; + if FNext[ListType]<>nil then + FNext[ListType].FPrev[ListType]:=FPrev[ListType]; + if FPrev[ListType]<>nil then + FPrev[ListType].FNext[ListType]:=FNext[ListType]; + FNext[ListType]:=nil; + FPrev[ListType]:=nil; +end; + +procedure TProcThread.Terminating(aPool: TProcThreadPool; + E: Exception); +begin + aPool.EnterPoolCriticalSection; + try + // remove from group + if Item.FGroup<>nil then begin + // an exception occured + Item.FGroup.EnterExceptionState(E); + Item.FGroup.RemoveThread(Self); + Item.FGroup:=nil; + end; + // move to pool's terminated threads + case Item.FState of + mtptsActive: RemoveFromList(aPool.FFirstActiveThread,mtptlPool); + mtptsInactive: RemoveFromList(aPool.FFirstInactiveThread,mtptlPool); + end; + AddToList(aPool.FFirstTerminatedThread,mtptlPool); + Item.FState:=mtptsTerminated; + finally + aPool.LeavePoolCriticalSection; + end; +end; + +constructor TProcThread.Create; +begin + inherited Create(true); + fItem:=TMultiThreadProcItem.Create; + fItem.fWaitForPool:=RTLEventCreate; +end; + +destructor TProcThread.Destroy; +begin + FreeAndNil(FItem); + inherited Destroy; +end; + +procedure TProcThread.Execute; +var + aPool: TProcThreadPool; + Group: TProcThreadGroup; + ok: Boolean; + E: Exception; +begin + Group:=Item.Group; + aPool:=Group.Pool; + ok:=false; + try + repeat + // work + Group.Run(Item.Index,Group.TaskData,Item); + + aPool.EnterPoolCriticalSection; + try + Group.IndexComplete(Item.Index); + + // find next work + if Group.LastRunningIndexnil then begin + // add to new group + aPool.FFirstGroupNeedThreads.AddThread(Self); + Group:=Item.Group; + end else begin + // mark inactive + RemoveFromList(aPool.FFirstActiveThread,mtptlPool); + AddToList(aPool.FFirstInactiveThread,mtptlPool); + Item.FState:=mtptsInactive; + RTLeventResetEvent(Item.fWaitForPool); + end; + end; + finally + aPool.LeavePoolCriticalSection; + end; + // wait for new work + if Item.FState=mtptsInactive then + RTLeventWaitFor(Item.fWaitForPool); + until Group=nil; + ok:=true; + except + // stop the exception and store it + E:=Exception(AcquireExceptionObject); + Terminating(aPool,E); + end; + if ok then + Terminating(aPool,nil); +end; + +{ TProcThreadGroup } + +procedure TProcThreadGroup.AddToList(var First: TProcThreadGroup; + ListType: TMTPGroupState); +begin + FNext:=First; + if FNext<>nil then + FNext.FPrev:=Self; + First:=Self; + FState:=ListType; +end; + +procedure TProcThreadGroup.RemoveFromList( + var First: TProcThreadGroup); +begin + if First=Self then + First:=FNext; + if FNext<>nil then + FNext.FPrev:=FPrev; + if FPrev<>nil then + FPrev.FNext:=FNext; + FNext:=nil; + FPrev:=nil; + FState:=mtpgsNone; +end; + +function TProcThreadGroup.NeedMoreThreads: boolean; +begin + Result:=(FLastRunningIndexmtpgsException); +end; + +procedure TProcThreadGroup.AddThread(AThread: TProcThread); +begin + AThread.Item.FGroup:=Self; + AThread.AddToList(FFirstThread,mtptlGroup); + inc(FThreadCount); + inc(FLastRunningIndex); + AThread.Item.FIndex:=FLastRunningIndex; + if not NeedMoreThreads then begin + RemoveFromList(Pool.FFirstGroupNeedThreads); + AddToList(Pool.FFirstGroupFinishing,mtpgsFinishing); + end; +end; + +procedure TProcThreadGroup.RemoveThread(AThread: TProcThread); +begin + AThread.RemoveFromList(FFirstThread,mtptlGroup); + dec(FThreadCount); +end; + +procedure TProcThreadGroup.Run(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); inline; +begin + if Assigned(FTaskProcdure) then + FTaskProcdure(Index,Data,Item) + else + FTaskMethod(Index,Data,Item); +end; + +procedure TProcThreadGroup.IndexComplete(Index: PtrInt); +var + AThread: TProcThread; + NewFirstRunningThread: PtrInt; +begin + // update FirstRunningIndex + NewFirstRunningThread:=FStarterItem.Index; + AThread:=FFirstThread; + while AThread<>nil do begin + if (NewFirstRunningThread>aThread.Item.Index) + and (aThread.Item.Index<>Index) then + NewFirstRunningThread:=aThread.Item.Index; + aThread:=aThread.FNext[mtptlGroup]; + end; + FFirstRunningIndex:=NewFirstRunningThread; + // wake up threads (Note: do this even if FFirstRunningIndex has not changed) + WakeThreadsWaitingForIndex; +end; + +procedure TProcThreadGroup.WakeThreadsWaitingForIndex; +var + aThread: TProcThread; +begin + if FState<>mtpgsException then begin + // wake up waiting threads + aThread:=FFirstThread; + while aThread<>nil do begin + if (aThread.Item.FState=mtptsWaitingForIndex) + and HasFinishedIndex(aThread.Item.WaitingForIndexStart, + aThread.Item.WaitingForIndexEnd) + then begin + // wake up the thread + aThread.Item.FState:=mtptsActive; + RTLeventSetEvent(aThread.Item.fWaitForPool); + end; + aThread:=aThread.FNext[mtptlGroup]; + end; + if (FStarterItem.FState=mtptsWaitingForIndex) + and HasFinishedIndex(FStarterItem.WaitingForIndexStart,FStarterItem.WaitingForIndexEnd) + then begin + // wake up the starter thread of this group + FStarterItem.FState:=mtptsActive; + RTLeventSetEvent(FStarterItem.fWaitForPool); + end; + end else begin + // end group: wake up waiting threads + aThread:=FFirstThread; + while aThread<>nil do begin + if (aThread.Item.FState=mtptsWaitingForIndex) + then begin + // end group: wake up the thread + aThread.Item.FState:=mtptsWaitingFailed; + RTLeventSetEvent(aThread.Item.fWaitForPool); + end; + aThread:=aThread.FNext[mtptlGroup]; + end; + if (FStarterItem.FState=mtptsWaitingForIndex) + then begin + // end group: wake up the starter thread of this group + FStarterItem.FState:=mtptsWaitingFailed; + RTLeventSetEvent(FStarterItem.fWaitForPool); + end; + end; +end; + +function TProcThreadGroup.HasFinishedIndex( + aStartIndex, aEndIndex: PtrInt): boolean; +var + AThread: TProcThread; +begin + // test the finished range + if FFirstRunningIndex>aEndIndex then exit(true); + // test the unfinished range + if FLastRunningIndexnil do begin + if (AThread.Item.Index>=aStartIndex) + and (AThread.Item.Index<=aEndIndex) then + exit(false); + AThread:=AThread.FNext[mtptlGroup]; + end; + if (FStarterItem.Index>=aStartIndex) + and (FStarterItem.Index<=aEndIndex) then + exit(false); + Result:=true; +end; + +procedure TProcThreadGroup.EnterExceptionState(E: Exception); +begin + if FState=mtpgsException then exit; + case FState of + mtpgsFinishing: RemoveFromList(Pool.FFirstGroupFinishing); + mtpgsNeedThreads: RemoveFromList(Pool.FFirstGroupNeedThreads); + end; + FState:=mtpgsException; + FException:=E; + WakeThreadsWaitingForIndex; +end; + +constructor TProcThreadGroup.Create; +begin + FStarterItem:=TMultiThreadProcItem.Create; + FStarterItem.FGroup:=Self; +end; + +destructor TProcThreadGroup.Destroy; +begin + FreeAndNil(FStarterItem); + inherited Destroy; +end; + +{ TProcThreadPool } + +procedure TProcThreadPool.SetMaxThreadCount(const AValue: PtrInt); +begin + if FMaxThreadCount=AValue then exit; + if AValue<1 then raise Exception.Create('TLightWeightThreadPool.SetMaxThreadCount'); + FMaxThreadCount:=AValue; +end; + +procedure TProcThreadPool.CleanTerminatedThreads; +var + AThread: TProcThread; +begin + while FFirstTerminatedThread<>nil do begin + AThread:=FFirstTerminatedThread; + AThread.RemoveFromList(FFirstTerminatedThread,mtptlPool); + AThread.Free; + end; +end; + +constructor TProcThreadPool.Create; +begin + FMaxThreadCount:=GetSystemThreadCount; + if FMaxThreadCount<1 then + FMaxThreadCount:=1; + InitCriticalSection(FCritSection); +end; + +destructor TProcThreadPool.Destroy; + + procedure WakeWaitingStarterItems(Group: TProcThreadGroup); + begin + while Group<>nil do begin + if Group.StarterItem.FState=mtptsWaitingForIndex then begin + Group.StarterItem.FState:=mtptsWaitingFailed; + RTLeventSetEvent(Group.StarterItem.fWaitForPool); + end; + Group:=Group.FNext; + end; + end; + +var + AThread: TProcThread; +begin + FDestroying:=true; + // wake up all waiting threads + EnterPoolCriticalSection; + try + AThread:=FFirstActiveThread; + while AThread<>nil do begin + if aThread.Item.FState=mtptsWaitingForIndex then begin + aThread.Item.FState:=mtptsWaitingFailed; + RTLeventSetEvent(AThread.Item.fWaitForPool); + end; + AThread:=AThread.FNext[mtptlPool]; + end; + WakeWaitingStarterItems(FFirstGroupNeedThreads); + WakeWaitingStarterItems(FFirstGroupFinishing); + finally + LeavePoolCriticalSection; + end; + + // wait for all active threads to become inactive + while FFirstActiveThread<>nil do + Sleep(10); + + // wake up all inactive threads (without new work they will terminate) + EnterPoolCriticalSection; + try + AThread:=FFirstInactiveThread; + while AThread<>nil do begin + RTLeventSetEvent(AThread.Item.fWaitForPool); + AThread:=AThread.FNext[mtptlPool]; + end; + finally + LeavePoolCriticalSection; + end; + + // wait for all threads to terminate + while FFirstInactiveThread<>nil do + Sleep(10); + + // free threads + CleanTerminatedThreads; + + DoneCriticalsection(FCritSection); + inherited Destroy; +end; + +procedure TProcThreadPool.EnterPoolCriticalSection; +begin + EnterCriticalsection(FCritSection); +end; + +procedure TProcThreadPool.LeavePoolCriticalSection; +begin + LeaveCriticalsection(FCritSection); +end; + +procedure TProcThreadPool.DoParallel(const AMethod: TMTMethod; + StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt); +begin + if not Assigned(AMethod) then exit; + DoParallelIntern(AMethod,nil,StartIndex,EndIndex,Data,MaxThreads); +end; + +procedure TProcThreadPool.DoParallel(const AProc: TMTProcedure; + StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt); +begin + if not Assigned(AProc) then exit; + DoParallelIntern(nil,AProc,StartIndex,EndIndex,Data,MaxThreads); +end; + +procedure TProcThreadPool.DoParallelIntern(const AMethod: TMTMethod; + const AProc: TMTProcedure; + StartIndex, EndIndex: PtrInt; Data: Pointer; MaxThreads: PtrInt); +var + Group: TProcThreadGroup; + i: PtrInt; + AThread: TProcThread; + NewThread: Boolean; + Item: TMultiThreadProcItem; + HelperThreadException: Exception; +begin + if (StartIndex>EndIndex) then exit; // nothing to do + if FDestroying then raise Exception.Create('Pool destroyed'); + + if (MaxThreads>MaxThreadCount) or (MaxThreads<=0) then + MaxThreads:=MaxThreadCount; + if (StartIndex=EndIndex) or (MaxThreads<=1) then begin + // single threaded + Item:=TMultiThreadProcItem.Create; + try + for i:=StartIndex to EndIndex do begin + Item.FIndex:=i; + AMethod(i,Data,Item); + end; + finally + Item.Free; + end; + exit; + end; + + // create a new group + Group:=TProcThreadGroup.Create; + Group.FPool:=Self; + Group.FTaskData:=Data; + Group.FTaskMethod:=AMethod; + Group.FTaskProcdure:=AProc; + Group.FStartIndex:=StartIndex; + Group.FEndIndex:=EndIndex; + Group.FFirstRunningIndex:=StartIndex; + Group.FLastRunningIndex:=StartIndex; + Group.FMaxThreads:=MaxThreads; + Group.FThreadCount:=1; + Group.FStarterItem.FState:=mtptsActive; + Group.FStarterItem.FIndex:=StartIndex; + HelperThreadException:=nil; + try + // start threads + EnterPoolCriticalSection; + try + Group.AddToList(FFirstGroupNeedThreads,mtpgsNeedThreads); + while Group.NeedMoreThreads do begin + AThread:=FFirstInactiveThread; + NewThread:=false; + if AThread<>nil then begin + AThread.RemoveFromList(FFirstInactiveThread,mtptlPool); + end else if FThreadCountmtpgsException) + then begin + inc(Group.FLastRunningIndex); + i:=Group.FLastRunningIndex; + end else begin + i:=StartIndex; + end; + finally + LeavePoolCriticalSection; + end; + until i=StartIndex; + finally + // wait for Group to finish + if Group.FFirstThread<>nil then begin + EnterPoolCriticalSection; + try + Group.FStarterItem.FState:=mtptsInactive; + Group.FStarterItem.fIndex:=EndIndex;// needed for Group.HasFinishedIndex + // wake threads waiting for starter thread to finish + if Group.FStarterItem.FState<>mtptsInactive then + Group.EnterExceptionState(nil) + else + Group.WakeThreadsWaitingForIndex; + finally + LeavePoolCriticalSection; + end; + // waiting with exponential spin lock + i:=0; + while Group.FFirstThread<>nil do begin + sleep(i); + i:=i*2+1; + if i>30 then i:=30; + end; + end; + // remove group from pool + EnterPoolCriticalSection; + try + case Group.FState of + mtpgsNeedThreads: Group.RemoveFromList(FFirstGroupNeedThreads); + mtpgsFinishing: Group.RemoveFromList(FFirstGroupFinishing); + end; + finally + LeavePoolCriticalSection; + end; + HelperThreadException:=Group.FException; + Group.Free; + // free terminated threads (terminated, because of exceptions) + CleanTerminatedThreads; + end; + // if the exception occured in a helper thread raise it now + if HelperThreadException<>nil then + raise HelperThreadException; +end; + +initialization + ProcThreadPool:=TProcThreadPool.Create; + +finalization + ProcThreadPool.Free; + ProcThreadPool:=nil; + +end. + diff --git a/components/multithreadprocs/mtputils.pas b/components/multithreadprocs/mtputils.pas new file mode 100644 index 000000000..4c5a6a857 --- /dev/null +++ b/components/multithreadprocs/mtputils.pas @@ -0,0 +1,200 @@ +{ Utilities using light weight threads. + + This file is part of the Free Pascal run time library. + + Copyright (C) 2008 Mattias Gaertner mattias@freepascal.org + + See the file COPYING.FPC, included in this distribution, + for details about the copyright. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + + **********************************************************************} +{ + Abstract: + +} +unit MTPUtils; + +{$mode objfpc}{$H+} + +interface + +uses + Classes, SysUtils, MTProcs; + +type + + { TParallelSortPointerList } + + TParallelSortPointerList = class + protected + fBlockSize: PtrInt; + fBlockCntPowOf2Offset: PtrInt; + FMergeBuffer: PPointer; + procedure MTPSort(Index: PtrInt; Data: Pointer; Item: TMultiThreadProcItem); + public + List: PPointer; + Count: PtrInt; + Compare: TListSortCompare; + BlockCnt: PtrInt; + constructor Create(aList: PPointer; aCount: PtrInt; const aCompare: TListSortCompare; + MaxThreadCount: integer = 0); + procedure Sort; + end; + +procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; + MaxThreadCount: integer = 0); + +implementation + +procedure ParallelSortFPList(List: TFPList; const Compare: TListSortCompare; + MaxThreadCount: integer = 0); +var + Sorter: TParallelSortPointerList; +begin + if List.Count<=1 then exit; + Sorter:=TParallelSortPointerList.Create(@List.List[0],List.Count,Compare, + MaxThreadCount); + try + Sorter.Sort; + finally + Sorter.Free; + end; +end; + +{ TParallelSortPointerList } + +procedure TParallelSortPointerList.MTPSort(Index: PtrInt; Data: Pointer; + Item: TMultiThreadProcItem); + + procedure MergeSort(L, M, R: PtrInt; Recursive: boolean); + var + Src1: PtrInt; + Src2: PtrInt; + Dest1: PtrInt; + begin + if R-L<=1 then begin + // sort lists of 1 and 2 items directly + if L0 then begin + FMergeBuffer[L]:=List[L]; + List[L]:=List[R]; + List[R]:=FMergeBuffer[L]; + end; + end; + exit; + end; + // sort recursively + if Recursive then begin + MergeSort(L,(L+M) div 2,M-1,true); + MergeSort(M,(M+R+1) div 2,R,true); + end; + // merge both blocks + Src1:=L; + Src2:=M; + Dest1:=L; + repeat + if (Src1R) or (Compare(List[Src1],List[Src2])<=0)) then begin + FMergeBuffer[Dest1]:=List[Src1]; + inc(Dest1); + inc(Src1); + end else if (Src2<=R) then begin + FMergeBuffer[Dest1]:=List[Src2]; + inc(Dest1); + inc(Src2); + end else + break; + until false; + // write the mergebuffer back + Src1:=L; + Dest1:=l; + while Src1<=R do begin + List[Dest1]:=FMergeBuffer[Src1]; + inc(Src1); + inc(Dest1); + end; + end; + +var + L, M, R: PtrInt; + i: integer; + NormIndex: Integer; + Range: integer; + MergeIndex: Integer; +begin + L:=fBlockSize*Index; + R:=L+fBlockSize-1; // middle block + if R>=Count then + R:=Count-1; // last block + WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' sort block: ',L,' ',(L+R+1) div 2,' ',R); + MergeSort(L,(L+R+1) div 2,R,true); + + // merge + // 0 1 2 3 4 5 6 7 + // \/ \/ \/ \/ + // \/ \/ + // \/ + // For example: BlockCnt = 5 => Index in 0..4 + // fBlockCntPowOf2Offset = 3 (=8-5) + // NormIndex = Index + 3 => NormIndex in 3..7 + i:=0; + NormIndex:=Index+fBlockCntPowOf2Offset; + repeat + Range:=1 shl i; + if NormIndex and Range=0 then break; + // merge left and right block(s) + MergeIndex:=NormIndex-Range-fBlockCntPowOf2Offset; + if (MergeIndex+Range-1>=0) then begin + // wait until left blocks have finished + WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' wait for block ',MergeIndex); + if (MergeIndex>=0) and (not Item.WaitForIndex(MergeIndex)) then exit; + // compute left and right block bounds + M:=L; + L:=(MergeIndex-Range+1)*fBlockSize; + if L<0 then L:=0; + WriteLn('TParallelSortPointerList.LWTSort Index=',Index,' merge blocks ',L,' ',M,' ',R); + MergeSort(L,M,R,false); + end; + inc(i); + until false; + WriteLn('TParallelSortPointerList.LWTSort END Index='+IntToStr(Index)); +end; + +constructor TParallelSortPointerList.Create(aList: PPointer; aCount: PtrInt; + const aCompare: TListSortCompare; MaxThreadCount: integer); +begin + List:=aList; + Count:=aCount; + Compare:=aCompare; + BlockCnt:=Count div 100; // at least 100 items per thread + if BlockCnt>ProcThreadPool.MaxThreadCount then + BlockCnt:=ProcThreadPool.MaxThreadCount; + if (MaxThreadCount>0) and (BlockCnt>MaxThreadCount) then + BlockCnt:=MaxThreadCount; + if BlockCnt<1 then BlockCnt:=1; +end; + +procedure TParallelSortPointerList.Sort; +begin + if (Count<=1) then exit; + fBlockSize:=(Count+BlockCnt-1) div BlockCnt; + fBlockCntPowOf2Offset:=1; + while fBlockCntPowOf2Offset