协议使用方式
Triple 协议是 Dubbo3 的主力协议,完整兼容 gRPC over HTTP/2,并在协议层面扩展了负载均衡和流量控制相关机制。本文档旨在指导用户正确的使用 Triple 协议。
在开始前,需要决定服务使用的序列化方式,如果为新服务,推荐使用 protobuf 作为默认序列化,在性能和跨语言上的效果都会更好。如果是原有服务想进行协议升级,Triple 协议也已经支持其他序列化方式,如 Hessian / JSON 等
Protobuf 方式
编写 IDL 文件
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.dubbo.hello"; option java_outer_classname = "HelloWorldProto"; option objc_class_prefix = "HLW"; package helloworld; // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
添加编译 protobuf 的 extension 和 plugin (以 maven 为例)
<extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact> <pluginId>triple-java</pluginId> <outputDirectory>build/generated/source/proto/main/java</outputDirectory> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>test-compile</goal> </goals> </execution> </executions> </plugin> </plugins>
构建/ 编译生成 protobuf Message 类
mvn clean install
Unary 方式
编写 Java 接口
import org.apache.dubbo.hello.HelloReply; import org.apache.dubbo.hello.HelloRequest; public interface IGreeter { /** * <pre> * Sends a greeting * </pre> */ HelloReply sayHello(HelloRequest request); }
创建 Provider
public static void main(String[] args) throws InterruptedException { ServiceConfig<IGreeter> service = new ServiceConfig<>(); service.setInterface(IGreeter.class); service.setRef(new IGreeter1Impl()); // 这里需要显示声明使用的协议为triple service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051)); service.setApplication(new ApplicationConfig("demo-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); service.export(); System.out.println("dubbo service started"); new CountDownLatch(1).await(); }
创建 Consumer
public static void main(String[] args) throws IOException { ReferenceConfig<IGreeter> ref = new ReferenceConfig<>(); ref.setInterface(IGreeter.class); ref.setCheck(false); ref.setProtocol(CommonConstants.TRIPLE); ref.setLazy(true); ref.setTimeout(100000); ref.setApplication(new ApplicationConfig("demo-consumer")); ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); final IGreeter iGreeter = ref.get(); System.out.println("dubbo ref started"); try { final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder() .setName("name") .build()); TimeUnit.SECONDS.sleep(1); System.out.println("Reply:" + reply); } catch (Throwable t) { t.printStackTrace(); } System.in.read(); }
运行 Provider 和 Consumer ,可以看到请求正常返回
> Reply:message: "name"
stream 方式
编写 Java 接口
import org.apache.dubbo.hello.HelloReply; import org.apache.dubbo.hello.HelloRequest; public interface IGreeter { /** * <pre> * Sends greeting by stream * </pre> */ StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver); }
编写实现类
public class IStreamGreeterImpl implements IStreamGreeter { @Override public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) { return new StreamObserver<HelloRequest>() { private List<HelloReply> replyList = new ArrayList<>(); @Override public void onNext(HelloRequest helloRequest) { System.out.println("onNext receive request name:" + helloRequest.getName()); replyList.add(HelloReply.newBuilder() .setMessage("receive name:" + helloRequest.getName()) .build()); } @Override public void onError(Throwable cause) { System.out.println("onError"); replyObserver.onError(cause); } @Override public void onCompleted() { System.out.println("onComplete receive request size:" + replyList.size()); for (HelloReply reply : replyList) { replyObserver.onNext(reply); } replyObserver.onCompleted(); } }; } }
创建 Provider
public class StreamProvider { public static void main(String[] args) throws InterruptedException { ServiceConfig<IStreamGreeter> service = new ServiceConfig<>(); service.setInterface(IStreamGreeter.class); service.setRef(new IStreamGreeterImpl()); service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051)); service.setApplication(new ApplicationConfig("stream-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); service.export(); System.out.println("dubbo service started"); new CountDownLatch(1).await(); } }
创建 Consumer
public class StreamConsumer { public static void main(String[] args) throws InterruptedException, IOException { ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>(); ref.setInterface(IStreamGreeter.class); ref.setCheck(false); ref.setProtocol(CommonConstants.TRIPLE); ref.setLazy(true); ref.setTimeout(100000); ref.setApplication(new ApplicationConfig("stream-consumer")); ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181")); final IStreamGreeter iStreamGreeter = ref.get(); System.out.println("dubbo ref started"); try { StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() { @Override public void onNext(HelloReply reply) { System.out.println("onNext"); System.out.println(reply.getMessage()); } @Override public void onError(Throwable throwable) { System.out.println("onError:" + throwable.getMessage()); } @Override public void onCompleted() { System.out.println("onCompleted"); } }); streamObserver.onNext(HelloRequest.newBuilder() .setName("tony") .build()); streamObserver.onNext(HelloRequest.newBuilder() .setName("nick") .build()); streamObserver.onCompleted(); } catch (Throwable t) { t.printStackTrace(); } System.in.read(); } }
运行 Provider 和 Consumer ,可以看到请求正常返回了
> onNext\ > receive name:tony\ > onNext\ > receive name:nick\ > onCompleted
其他序列化方式
省略上文中的 1-3 步,指定 Provider 和 Consumer 使用的协议即可完成协议升级。
本文的示例可以在 triple-samples 找到
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.
最后修改 January 28, 2023: Update guide.md (#1949) (ea7489c3c)