TCP protocole now supports block type prefixed messages.

git-svn-id: https://svn.code.sf.net/p/lazarus-ccr/svn@1783 8e941d3f-bd1b-0410-a28a-d453659cc2b4
This commit is contained in:
inoussa
2011-08-08 02:24:18 +00:00
parent d1ae85de1d
commit 0a24ccf684
4 changed files with 124 additions and 44 deletions

View File

@ -54,6 +54,7 @@ Type
FContentType : string; FContentType : string;
FFormat : string; FFormat : string;
FTarget : string; FTarget : string;
FUseBlockType : Boolean;
protected protected
procedure DoSend(const AData; const ALength : Int64); virtual; abstract; procedure DoSend(const AData; const ALength : Int64); virtual; abstract;
function DoReceive(var AData; const ALength : Int64) : Int64; virtual; abstract; function DoReceive(var AData; const ALength : Int64) : Int64; virtual; abstract;
@ -63,6 +64,7 @@ Type
property Target : string Read FTarget Write FTarget; property Target : string Read FTarget Write FTarget;
property ContentType : string Read FContentType Write FContentType; property ContentType : string Read FContentType Write FContentType;
property Format : string read FFormat write FFormat; property Format : string read FFormat write FFormat;
property UseBlockType : Boolean read FUseBlockType write FUseBlockType default false;
end; end;
{$M+} {$M+}
@ -113,6 +115,8 @@ begin
buffStream := TMemoryStream.Create(); buffStream := TMemoryStream.Create();
Try Try
wrtr := CreateBinaryWriter(buffStream); wrtr := CreateBinaryWriter(buffStream);
if UseBlockType then
wrtr.WriteInt32S(WST_BLOCK_TYPE);
wrtr.WriteInt32S(0); wrtr.WriteInt32S(0);
wrtr.WriteAnsiStr(Target); wrtr.WriteAnsiStr(Target);
wrtr.WriteAnsiStr(ContentType); wrtr.WriteAnsiStr(ContentType);
@ -134,8 +138,13 @@ begin
end; end;
wrtr.WriteBinary(binBuff); wrtr.WriteBinary(binBuff);
SetLength(binBuff,0); SetLength(binBuff,0);
buffStream.Position := 0; if UseBlockType then begin
wrtr.WriteInt32S(buffStream.Size-4); buffStream.Position := 4;
wrtr.WriteInt32S(buffStream.Size-({BlockType}4+4));
end else begin
buffStream.Position := 0;
wrtr.WriteInt32S(buffStream.Size-4);
end;
buffStream.Position := 0; buffStream.Position := 0;
DoSend(buffStream.Memory^,buffStream.Size); DoSend(buffStream.Memory^,buffStream.Size);

View File

@ -19,8 +19,7 @@ uses
Classes, SysUtils, ssockets, server_listener, wst_types; Classes, SysUtils, ssockets, server_listener, wst_types;
const const
sSERVER_PORT = 1234; sSERVER_PORT = 1234;
type type
TwstFPCTcpListener = class; TwstFPCTcpListener = class;
@ -33,7 +32,7 @@ type
FSocket : TSocketStream; FSocket : TSocketStream;
FOwner : TwstFPCTcpListener; FOwner : TwstFPCTcpListener;
private private
function ReadRequest(ARequest : TStream):Integer; function ReadRequest(ARequest : TStream; var ABlockType : LongInt):Integer;
procedure SendResponse(AResponse : TMemoryStream); procedure SendResponse(AResponse : TMemoryStream);
public public
constructor Create (ASocket : TSocketStream; AOwner : TwstFPCTcpListener); constructor Create (ASocket : TSocketStream; AOwner : TwstFPCTcpListener);
@ -58,11 +57,11 @@ type
procedure SuspendAsSoonAsPossible(); procedure SuspendAsSoonAsPossible();
procedure ResumeListening(); procedure ResumeListening();
property DefaultTimeOut : Integer read FDefaultTimeOut write FDefaultTimeOut; property DefaultTimeOut : Integer read FDefaultTimeOut write FDefaultTimeOut;
end; end;
{ TwstFPCTcpListener } { TwstFPCTcpListener }
TwstFPCTcpListener = class(TwstListener) TwstFPCTcpListener = class(TwstBaseTcpListener)
private private
FServerThread : TServerListnerThread; FServerThread : TServerListnerThread;
FPort : Integer; FPort : Integer;
@ -76,27 +75,38 @@ type
implementation implementation
uses binary_streamer, server_service_intf, server_service_imputils, math; uses
wst_consts, binary_streamer, server_service_intf, server_service_imputils, math;
resourcestring
SErrReadingFromSocket = 'Error %d reading data from socket';
SErrWritingToSocket = 'Error %d writing data to socket';
{ TClientHandlerThread } { TClientHandlerThread }
function TClientHandlerThread.ReadRequest(ARequest : TStream): Integer; function TClientHandlerThread.ReadRequest(
ARequest : TStream;
var ABlockType : LongInt
): Integer;
var var
binBuff : TByteDynArray; binBuff : TByteDynArray;
bufferLen : LongInt; bufferLen, bktype : TInt32S;
i, j, c, readBufferLen : PtrInt; i, j, c : PtrInt;
begin begin
Result := 0; Result := 0;
if (tloHandleBlockType in FOwner.Options) then begin
bktype := 0;
j:=FSocket.Read(bktype,SizeOf(bktype));
if (j<0) then
raise Exception.CreateFmt(SERR_ErrorReadindDataToSocket,[FSocket.LastError]);
if (j=0) then
Exit(0) // Closed gracefully
else
bktype:=Reverse_32(bktype);
end;
bufferLen := 0; bufferLen := 0;
j:=FSocket.Read(bufferLen,SizeOf(bufferLen)); j:=FSocket.Read(bufferLen,SizeOf(bufferLen));
if (j<0) then if (j<0) then
Raise Exception.CreateFmt(SErrReadingFromSocket,[FSocket.LastError]); Raise Exception.CreateFmt(SERR_ErrorReadindDataToSocket,[FSocket.LastError]);
if (j=0) then if (j=0) then
Exit(0) // Closed gracefully Exit(0) // Closed gracefully
else else
@ -110,7 +120,7 @@ begin
repeat repeat
j:=FSocket.Read(binBuff[0],i); j:=FSocket.Read(binBuff[0],i);
If (J<=0) then If (J<=0) then
Raise Exception.CreateFmt(SErrReadingFromSocket,[FSocket.LastError]); Raise Exception.CreateFmt(SERR_ErrorReadindDataToSocket,[FSocket.LastError]);
ARequest.Write(binBuff[0],j); ARequest.Write(binBuff[0],j);
Inc(c,j); Inc(c,j);
I:=Min(1024,(bufferLen - c )) I:=Min(1024,(bufferLen - c ))
@ -120,6 +130,9 @@ begin
if C<ARequest.Size then if C<ARequest.Size then
ARequest.Size:=C; ARequest.Size:=C;
end; end;
if (tloHandleBlockType in FOwner.Options) then
ABlockType := bktype;
end; end;
procedure TClientHandlerThread.SendResponse(AResponse : TMemoryStream); procedure TClientHandlerThread.SendResponse(AResponse : TMemoryStream);
@ -134,7 +147,7 @@ begin
Repeat Repeat
W:=FSocket.Write(P^,C); W:=FSocket.Write(P^,C);
if (W<0) then if (W<0) then
Raise Exception.CreateFmt(SErrWritingToSocket,[FSocket.LastError]); Raise Exception.CreateFmt(SERR_ErrorSendindDataToSocket,[FSocket.LastError]);
Inc(P,W); Inc(P,W);
Dec(C,W); Dec(C,W);
Until (C=0) or (w=0); Until (C=0) or (w=0);
@ -174,26 +187,33 @@ var
rqst : IRequestBuffer; rqst : IRequestBuffer;
ARequest,AResponse : TMemoryStream; ARequest,AResponse : TMemoryStream;
i : PtrUInt; i : PtrUInt;
blocktype : TInt32S;
begin begin
while not Terminated do while not Terminated do begin
Try Try
ARequest:=TMemoryStream.Create; blocktype := 0;
AResponse := nil;
ARequest := TMemoryStream.Create;
try try
if ReadRequest(ARequest)>SizeOf(LongInt) then if ReadRequest(ARequest,blocktype)>SizeOf(LongInt) then begin
begin AResponse := TMemoryStream.Create();
rdr := CreateBinaryReader(ARequest); if (tloHandleBlockType in FOwner.Options) and
trgt := rdr.ReadAnsiStr(); (blocktype <> WST_BLOCK_TYPE)
ctntyp := rdr.ReadAnsiStr(); then begin
frmt := rdr.ReadAnsiStr(); if (FOwner.UnknownBlockHandler <> nil) then
buff := rdr.ReadBinary(); FOwner.UnknownBlockHandler.Execute(blocktype,ARequest,AResponse);
rdr := nil; end else begin
ARequest.Size := 0; rdr := CreateBinaryReader(ARequest);
ARequest.Write(buff[0],Length(buff)); trgt := rdr.ReadAnsiStr();
SetLength(buff,0); ctntyp := rdr.ReadAnsiStr();
ARequest.Position := 0; frmt := rdr.ReadAnsiStr();
AResponse:=TMemoryStream.Create; buff := rdr.ReadBinary();
try rdr := nil;
ARequest.Size := 0;
ARequest.Write(buff[0],Length(buff));
SetLength(buff,0);
ARequest.Position := 0;
rqst := TRequestBuffer.Create(trgt,ctntyp,ARequest,AResponse,frmt); rqst := TRequestBuffer.Create(trgt,ctntyp,ARequest,AResponse,frmt);
//rqst.GetPropertyManager().SetProperty(sREMOTE_IP,FSocketObject.GetRemoteSinIP()); //rqst.GetPropertyManager().SetProperty(sREMOTE_IP,FSocketObject.GetRemoteSinIP());
//rqst.GetPropertyManager().SetProperty(sREMOTE_PORT,IntToStr(FSocketObject.GetRemoteSinPort())); //rqst.GetPropertyManager().SetProperty(sREMOTE_PORT,IntToStr(FSocketObject.GetRemoteSinPort()));
@ -206,21 +226,21 @@ begin
wrtr := CreateBinaryWriter(AResponse); wrtr := CreateBinaryWriter(AResponse);
wrtr.WriteBinary(buff); wrtr.WriteBinary(buff);
SetLength(buff,0); SetLength(buff,0);
end;
if (AResponse.Size > 0) then
SendResponse(AResponse); SendResponse(AResponse);
finally end;
AResponse.Free;
end;
end;
finally finally
AResponse.Free;
ARequest.Free; ARequest.Free;
end; end;
except except
on e : Exception do on e : Exception do begin
begin
Terminate; Terminate;
FOwner.NotifyMessage(Format('Error : ThreadID = %d; Message = %s',[Self.ThreadID,e.Message])); FOwner.NotifyMessage(Format('Error : ThreadID = %d; Message = %s',[Self.ThreadID,e.Message]));
end; end;
end; end;
end;
end; end;
{ TServerListnerThread } { TServerListnerThread }

View File

@ -23,9 +23,20 @@ const
sSERVICES_PREFIXE = 'services'; sSERVICES_PREFIXE = 'services';
sWSDL = 'WSDL'; sWSDL = 'WSDL';
type type
TTcpListenerOption = (tloHandleBlockType);
TTcpListenerOptions = set of TTcpListenerOption;
TListnerNotifyMessage = procedure(Sender : TObject; const AMsg : string) of object; TListnerNotifyMessage = procedure(Sender : TObject; const AMsg : string) of object;
IBlockHandler = interface
['{E0C50F08-A2C3-41D7-ACD5-E7867DD9F981}']
procedure Execute(
const ABlockType : LongInt;
ARequestBlock,
AResponseBlock : TStream
);
end;
{ TwstListener } { TwstListener }
@ -43,10 +54,25 @@ type
property OnNotifyMessage : TListnerNotifyMessage read FOnNotifyMessage write SetOnNotifyMessage; property OnNotifyMessage : TListnerNotifyMessage read FOnNotifyMessage write SetOnNotifyMessage;
end; end;
{ TwstBaseTcpListener }
TwstBaseTcpListener = class(TwstListener)
private
FOptions : TTcpListenerOptions;
FUnknownBlockHandler : IBlockHandler;
protected
procedure CheckActive(const AActive : Boolean; ACaller : string);
procedure SetOptions(const AValue : TTcpListenerOptions);
procedure SetUnknownBlockHandler(const AValue : IBlockHandler);
public
property Options : TTcpListenerOptions read FOptions write SetOptions;
property UnknownBlockHandler : IBlockHandler read FUnknownBlockHandler write SetUnknownBlockHandler;
end;
function GenerateWSDLHtmlTable(const AServicesModulePath : string=''): string; function GenerateWSDLHtmlTable(const AServicesModulePath : string=''): string;
implementation implementation
uses base_service_intf, metadata_repository, uses wst_consts, base_service_intf, metadata_repository,
metadata_service, metadata_service_binder, metadata_service_imp ; metadata_service, metadata_service_binder, metadata_service_imp ;
@ -89,6 +115,30 @@ begin
'</html>'; '</html>';
end; end;
{ TwstBaseTcpListener }
procedure TwstBaseTcpListener.CheckActive(const AActive : Boolean; ACaller : string);
begin
if (IsActive() <> AActive) then
raise Exception.CreateFmt(SERR_ObjectStateDoesNotAllowOperation,[ACaller]);
end;
procedure TwstBaseTcpListener.SetOptions(const AValue : TTcpListenerOptions);
begin
CheckActive(False,'SetOptions');
if (FOptions=AValue) then
exit;
FOptions:=AValue;
end;
procedure TwstBaseTcpListener.SetUnknownBlockHandler(const AValue : IBlockHandler);
begin
CheckActive(False,'SetUnknownBlockHandler');
if (FUnknownBlockHandler = AValue) then
exit;
FUnknownBlockHandler := AValue;
end;
{ TwstListener } { TwstListener }
procedure TwstListener.SetOnNotifyMessage(const AValue : TListnerNotifyMessage); procedure TwstListener.SetOnNotifyMessage(const AValue : TListnerNotifyMessage);

View File

@ -16,7 +16,8 @@ unit wst_consts;
interface interface
const const
WST_BLOCK_TYPE = LongInt(56789);
sWST_SIGNATURE = 'WST_METADATA_0.6'; sWST_SIGNATURE = 'WST_METADATA_0.6';
resourcestring resourcestring