diff --git a/wst/trunk/fpc_tcp_protocol.pas b/wst/trunk/fpc_tcp_protocol.pas new file mode 100644 index 000000000..f8ab942c5 --- /dev/null +++ b/wst/trunk/fpc_tcp_protocol.pas @@ -0,0 +1,229 @@ +{ + This file is part of the Web Service Toolkit + Copyright (c) 2006 by Inoussa OUEDRAOGO + + This file is provide under modified LGPL licence + ( the files COPYING.modifiedLGPL and COPYING.LGPL). + + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +} +{$INCLUDE wst_global.inc} +unit fpc_tcp_protocol; + +interface + +uses + Classes, SysUtils, + service_intf, imp_utils, base_service_intf, client_utils, + ssockets; + +//{$DEFINE WST_DBG} + +Const + sTRANSPORT_NAME = 'TCP'; + +Type + + ETCPException = class(EServiceException); + +{$M+} + { TTCPTransport } + TTCPTransport = class(TBaseTransport,ITransport) + Private + FFormat : string; + FConnection : TInetSocket; + FContentType : string; + FTarget: string; + FAddress : string; + FPort : string; + procedure ReadResponse(ADest: TStream); + procedure SendRequest(ARequest: TStream); + private + procedure Connect(); + public + constructor Create();override; + destructor Destroy();override; + function GetTransportName() : string; override; + procedure SendAndReceive(ARequest,AResponse:TStream); override; + Published + property Target : string Read FTarget Write FTarget; + property ContentType : string Read FContentType Write FContentType; + property Address : string Read FAddress Write FAddress; + property Port : string Read FPort Write FPort; + property Format : string read FFormat write FFormat; + End; + +procedure FPC_RegisterTCP_Transport(); + +implementation + +uses + binary_streamer, Math, wst_types; + +{ TTCPTransport } + +procedure TTCPTransport.Connect(); + +begin + if FConnection=Nil then + FConnection:=TInetSocket.Create(FAddress,StrToInt(Port)); +end; + +constructor TTCPTransport.Create(); +begin + inherited Create(); +end; + +destructor TTCPTransport.Destroy(); +begin + FreeAndNil(FConnection); + inherited Destroy(); +end; + +function TTCPTransport.GetTransportName() : string; +begin + Result := sTRANSPORT_NAME; +end; + +procedure TTCPTransport.SendRequest(ARequest : TStream); + + Procedure SendBuffer(P : PByte; ACount : Integer); + + Var + c : integer; + + begin + Repeat + C:=FConnection.Write(P^,ACount); + if (C<0) then + Raise ETCPException.CreateFmt('Error %d sending data to socket',[FConnection.LastError]); + If (C>0) then + begin + inc(P,C); + Dec(ACount,C); + end; + Until (ACount=0); + end; + + +Var + M : TMemoryStream; + binBuff : TByteDynArray; + wrtr : IDataStore; + +begin + SetLength(binBuff,ARequest.Size); + ARequest.Position := 0; + ARequest.ReadBuffer(binBuff[0],Length(binBuff)); + M := TMemoryStream.Create(); + Try + wrtr:=CreateBinaryWriter(M); + wrtr.WriteInt32S(0); + wrtr.WriteAnsiStr(Target); + wrtr.WriteAnsiStr(ContentType); + wrtr.WriteAnsiStr(Self.Format); + wrtr.WriteBinary(binBuff); + M.Position := 0; + wrtr.WriteInt32S(M.Size-4); + M.Position := 0; + SendBuffer(TMemoryStream(M).Memory,M.Size); + Finally + M.Free; + end; +end; + +procedure TTCPTransport.ReadResponse(ADest : TStream); + + Procedure ReadBuffer(Var Buf; ACount : Integer); + + Var + P : PByte; + C : integer; + + begin + if (ACount=0) then exit; + P:=PByte(@Buf); + repeat + C:=FConnection.Read(P^,ACount); + If (C<=0) then + Raise ETCPException.CreateFmt('Error %d reading data from socket',[FConnection.LastError]); + If (C>0) then + begin + Inc(P,C); + Dec(ACount,C); + end + Until (ACount=0); + end; + +var + bufferLen : LongInt; + i, j, c : PtrInt; + locBinBuff : TByteDynArray; +begin + bufferLen := 0; + ReadBuffer(BufferLen,SizeOf(BufferLen)); + bufferLen := Reverse_32(bufferLen); + ADest.Size := bufferLen; + ADest.Position:=0; + if (bufferLen>0) then + begin + c := 0; + i := Min(1024,Bufferlen); + SetLength(locBinBuff,i); + repeat + ReadBuffer(locBinBuff[0],i); + ADest.Write(locBinBuff[0],i); + Inc(c,i); + i:=Min(1024,(bufferLen-c)); + until (i=0); + end; +end; + +procedure TTCPTransport.SendAndReceive(ARequest, AResponse: TStream); + +Var + buffStream : TMemoryStream; + binBuff : TByteDynArray; + M : TStream; +begin + // Connect + Connect(); + // Filter + if HasFilter() then + M:=TMemoryStream.Create() + else + M:=ARequest; + try + if HasFilter() then + FilterInput(ARequest,M); + // Actually send buffer + SendRequest(M); + Finally + if (M<>ARequest) then + FreeAndNil(M); + end; + // Prepare to read response + if HasFilter() then + M:=TmemoryStream.Create + else + M:=AResponse; + try + // Actually read response + ReadResponse(M); + if HasFilter() then + FilterOutput(M,AResponse); + Finally + if (M<>AResponse) then + FreeAndNil(M); + end; +end; + +procedure FPC_RegisterTCP_Transport(); +begin + GetTransportRegistry().Register(sTRANSPORT_NAME,TSimpleItemFactory.Create(TTCPTransport) as IItemFactory); +end; + +end. diff --git a/wst/trunk/fpc_tcp_server.pas b/wst/trunk/fpc_tcp_server.pas new file mode 100644 index 000000000..d272e0ac4 --- /dev/null +++ b/wst/trunk/fpc_tcp_server.pas @@ -0,0 +1,324 @@ +{ + This file is part of the Web Service Toolkit + Copyright (c) 2006 by Inoussa OUEDRAOGO + + This file is provide under modified LGPL licence + ( the files COPYING.modifiedLGPL and COPYING.LGPL). + + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +} +{$INCLUDE wst_global.inc} +unit fpc_tcp_server; + +interface + +uses + Classes, SysUtils, ssockets, server_listener, wst_types; + +const + sSERVER_PORT = 1234; + +type + + TwstFPCTcpListener = class; + + { TClientHandlerThread } + + TClientHandlerThread = class(TThread) + private + FDefaultTimeOut: Integer; + FSocket : TSocketStream; + FOwner : TwstFPCTcpListener; + private + function ReadRequest(ARequest : TStream):Integer; + procedure SendResponse(AResponse : TMemoryStream); + public + constructor Create (ASocket : TSocketStream; AOwner : TwstFPCTcpListener); + destructor Destroy();override; + procedure Execute(); override; + property DefaultTimeOut : Integer read FDefaultTimeOut write FDefaultTimeOut; + end; + + { TServerListnerThread } + + TServerListnerThread = class(TThread) + procedure DoConnect(Sender: TObject; Data: TSocketStream); + private + FDefaultTimeOut: Integer; + FSocketObject : TInetServer; + FSuspendingCount : Integer; + FOwner : TwstFPCTcpListener; + public + constructor Create(AOwner : TwstFPCTcpListener); + destructor Destroy(); override; + procedure Execute(); override; + procedure SuspendAsSoonAsPossible(); + procedure ResumeListening(); + property DefaultTimeOut : Integer read FDefaultTimeOut write FDefaultTimeOut; + end; + + { TwstFPCTcpListener } + + TwstFPCTcpListener = class(TwstListener) + private + FServerThread : TServerListnerThread; + FPort : Integer; + public + constructor Create(const APort : Integer = sSERVER_PORT); + destructor Destroy();override; + procedure Start();override; + procedure Stop();override; + function IsActive : Boolean; override; + end; + +implementation + +uses binary_streamer, server_service_intf, server_service_imputils, math; + +resourcestring + SErrReadingFromSocket = 'Error %d reading data from socket'; + SErrWritingToSocket = 'Error %d writing data to socket'; + +{ TClientHandlerThread } + + +function TClientHandlerThread.ReadRequest(ARequest : TStream): Integer; +var + binBuff : TByteDynArray; + bufferLen : LongInt; + i, j, c, readBufferLen : PtrInt; + +begin + Result := 0; + bufferLen := 0; + j:=FSocket.Read(bufferLen,SizeOf(bufferLen)); + if (j<0) then + Raise Exception.CreateFmt(SErrReadingFromSocket,[FSocket.LastError]); + if (j=0) then + Exit(0) // Closed gracefully + else + begin + bufferLen:=Reverse_32(bufferLen); + ARequest.Size:=bufferLen; + c:=0; + i:=1024; + I:=Min(BufferLen,1024); + SetLength(binBuff,i); + repeat + j:=FSocket.Read(binBuff[0],i); + If (J<=0) then + Raise Exception.CreateFmt(SErrReadingFromSocket,[FSocket.LastError]); + ARequest.Write(binBuff[0],j); + Inc(c,j); + I:=Min(1024,(bufferLen - c )) + until (i=0) or (j=0); + ARequest.Position:=0; + Result:=C; + if CSizeOf(LongInt) then + begin + rdr := CreateBinaryReader(ARequest); + trgt := rdr.ReadAnsiStr(); + ctntyp := rdr.ReadAnsiStr(); + frmt := rdr.ReadAnsiStr(); + buff := rdr.ReadBinary(); + rdr := nil; + ARequest.Size := 0; + ARequest.Write(buff[0],Length(buff)); + SetLength(buff,0); + ARequest.Position := 0; + AResponse:=TMemoryStream.Create; + try + rqst := TRequestBuffer.Create(trgt,ctntyp,ARequest,AResponse,frmt); + //rqst.GetPropertyManager().SetProperty(sREMOTE_IP,FSocketObject.GetRemoteSinIP()); + //rqst.GetPropertyManager().SetProperty(sREMOTE_PORT,IntToStr(FSocketObject.GetRemoteSinPort())); + HandleServiceRequest(rqst); + i := AResponse.Size; + SetLength(buff,i); + AResponse.Position := 0; + AResponse.Read(buff[0],i); + AResponse.Size := 0; + wrtr := CreateBinaryWriter(AResponse); + wrtr.WriteBinary(buff); + SetLength(buff,0); + SendResponse(AResponse); + finally + AResponse.Free; + end; + end; + finally + ARequest.Free; + end; + except + on e : Exception do + begin + Terminate; + FOwner.NotifyMessage(Format('Error : ThreadID = %d; Message = %s',[Self.ThreadID,e.Message])); + end; + end; +end; + +{ TServerListnerThread } + +procedure TServerListnerThread.DoConnect(Sender: TObject; Data: TSocketStream); +begin + if (FSuspendingCount>0 ) then + Suspend(); + if Not Terminated then + TClientHandlerThread.Create(Data,FOwner) + else + Data.Free; +end; + +constructor TServerListnerThread.Create(AOwner : TwstFPCTcpListener); +begin + FOwner := AOwner; + inherited Create(false); +end; + +destructor TServerListnerThread.Destroy(); +begin + FreeAndNil(FSocketObject); + inherited Destroy(); +end; + +procedure TServerListnerThread.Execute(); + +begin + try + FSocketObject:=TInetServer.Create(FOwner.FPort); + try + FSocketObject.Bind(); + FSocketObject.OnConnect:=@DoConnect; + FSocketObject.StartAccepting(); + Finally + FSocketObject.Free; + end; + except + on e : Exception do + begin + Terminate; + FOwner.NotifyMessage(Format('Listner Thread Error : ThreadID = %d; Message = %s',[Self.ThreadID,e.Message])); + FOwner.NotifyMessage('Listner stoped.'); + end; + end; +end; + +procedure TServerListnerThread.SuspendAsSoonAsPossible(); +begin + InterLockedIncrement(FSuspendingCount); +end; + +procedure TServerListnerThread.ResumeListening(); +begin + InterLockedDecrement(FSuspendingCount); + if (FSuspendingCount<=0 ) then + begin + if Suspended then + Resume(); + end; +end; + +{ TwstFPCTcpListener } + +constructor TwstFPCTcpListener.Create(const APort : Integer = sServer_Port); +begin + FPort:=APort; +end; + +destructor TwstFPCTcpListener.Destroy(); +begin + if (FServerThread<>nil ) then + begin + FServerThread.Terminate(); + Start(); + end; + inherited Destroy(); +end; + +function TwstFPCTcpListener.IsActive: Boolean; +begin + Result := (FServerThread <> nil) and (not FServerThread.Suspended); +end; + +procedure TwstFPCTcpListener.Start(); +begin + if (FServerThread=nil) then + FServerThread:=TServerListnerThread.Create(Self); + if FServerThread.Suspended then + FServerThread.ResumeListening(); +end; + +procedure TwstFPCTcpListener.Stop(); +begin + if (FServerThread<>nil) and (not FServerThread.Suspended) then + FServerThread.SuspendAsSoonAsPossible(); +end; + +end. +