mirror of
				https://github.com/go-micro/go-micro.git
				synced 2025-10-30 23:27:41 +02:00 
			
		
		
		
	Improve stream processing
This commit is contained in:
		| @@ -94,6 +94,17 @@ func (c *Codec) Write(m *codec.Message, b interface{}) error { | ||||
| 		m.Header[":status"] = "200" | ||||
| 		m.Header["grpc-status"] = "0" | ||||
| 		//		m.Header["grpc-message"] = "" | ||||
| 	case codec.Error: | ||||
| 		m.Header["Trailer"] = "grpc-status, grpc-message" | ||||
| 		// micro end of stream | ||||
| 		if m.Error == "EOS" { | ||||
| 			m.Header["grpc-status"] = "0" | ||||
| 		} else { | ||||
| 			m.Header["grpc-message"] = m.Error | ||||
| 			m.Header["grpc-status"] = "13" | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// marshal content | ||||
|   | ||||
| @@ -154,34 +154,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { | ||||
| 		sockets[id] = psock | ||||
| 		mtx.Unlock() | ||||
|  | ||||
| 		// wait for processing to exit | ||||
| 		wg.Add(1) | ||||
|  | ||||
| 		// process the outbound messages from the socket | ||||
| 		go func(id string, psock *socket.Socket) { | ||||
| 			defer func() { | ||||
| 				wg.Done() | ||||
| 			}() | ||||
|  | ||||
| 			for { | ||||
| 				// get the message from our internal handler/stream | ||||
| 				m := new(transport.Message) | ||||
| 				if err := psock.Process(m); err != nil { | ||||
| 					// delete the socket | ||||
| 					mtx.Lock() | ||||
| 					delete(sockets, id) | ||||
| 					mtx.Unlock() | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// send the message back over the socket | ||||
| 				if err := sock.Send(m); err != nil { | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 			} | ||||
| 		}(id, psock) | ||||
|  | ||||
| 		// now walk the usual path | ||||
|  | ||||
| 		// we use this Timeout header to set a server deadline | ||||
| @@ -287,6 +259,33 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { | ||||
| 			r = rpcRouter{handler} | ||||
| 		} | ||||
|  | ||||
| 		// wait for processing to exit | ||||
| 		wg.Add(1) | ||||
|  | ||||
| 		// process the outbound messages from the socket | ||||
| 		go func(id string, psock *socket.Socket) { | ||||
| 			defer func() { | ||||
| 				wg.Done() | ||||
| 			}() | ||||
|  | ||||
| 			for { | ||||
| 				// get the message from our internal handler/stream | ||||
| 				m := new(transport.Message) | ||||
| 				if err := psock.Process(m); err != nil { | ||||
| 					// delete the socket | ||||
| 					mtx.Lock() | ||||
| 					delete(sockets, id) | ||||
| 					mtx.Unlock() | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 				// send the message back over the socket | ||||
| 				if err := sock.Send(m); err != nil { | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 		}(id, psock) | ||||
|  | ||||
| 		// serve the request in a go routine as this may be a stream | ||||
| 		go func(id string, psock *socket.Socket) { | ||||
| 			defer psock.Close() | ||||
|   | ||||
| @@ -324,6 +324,9 @@ func (h *httpTransportSocket) Send(m *Message) error { | ||||
| 	// write request | ||||
| 	_, err := h.w.Write(m.Body) | ||||
|  | ||||
| 	// flush the trailers | ||||
| 	h.w.(http.Flusher).Flush() | ||||
|  | ||||
| 	return err | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user