一、protobuf介绍
Protobuf
(Prototcol Buffers)Google 开源的一种序列化方式。
Protobuf
官方实现了一门语言,专门用来自定义数据结构。protoc
是这门语言的编译工具,可编译生成指定编程语言(如C++
、Java
、Golang
、Python
、C#
等)的源代码,然后开发者可以轻松在这些语言中使用该源代码进行编程。
编译.proto
时,编译器会以您选择的语言生成代码,您需要使用文件中描述的消息类型,包括获取和设置字段值,将消息序列化到输出流,并从输入流中解析您的消息。
- 对于**C++**,编译器从 each 生成一个
.h
and.cc
文件.proto
,并为文件中描述的每种消息类型提供一个类。 - 对于Java,编译器会
.java
为每种消息类型生成一个包含类的文件,以及Builder
用于创建消息类实例的特殊类。 - 对于Kotlin,除了 Java 生成的代码之外,编译器还会
.kt
为每种消息类型生成一个文件,其中包含可用于简化创建消息实例的 DSL。 - Python有点不同——Python 编译器会生成一个模块,
.proto
其中包含您的 . - 对于Go,编译器会为
.pb.go
文件中的每种消息类型生成一个文件类型。 - 对于Ruby,编译器会生成一个
.rb
带有 Ruby 模块的文件,其中包含您的消息类型。 - 对于Objective-C,编译器从 each 生成一个
pbobjc.h
andpbobjc.m
文件.proto
,并为文件中描述的每种消息类型提供一个类。 - 对于**C#**,编译器会
.cs
从 each 生成一个文件.proto
,其中包含文件中描述的每种消息类型的类。 - 对于Dart,编译器会为
.pb.dart
文件中的每种消息类型生成一个包含类的文件。
二、编译proto生成java class
IDEA需要安装 protobuf 插件,如果 IDEA 自带有 protocol buffers 插件,则可以不用在装了
1、pom文件添加依赖
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.43.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.43.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.43.1</version>
</dependency>
<!-- necessary for Java 9+ -->
<dependency>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
2、pom文件添加插件
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.19.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.43.1:exe:${os.detected.classifier}</pluginArtifact>
<!-- 指定 gRPC 类文件生成路径 -->
<outputDirectory>${basedir}/src/main/java</outputDirectory>
<!-- maven clean 时不清空 -->
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3、编译
mvn clean
protobuf:compile -f pom.xml
protobuf:compile-custom -f pom.xml
即可在 target/generated-sources/protobuf
目录中生成对应的 java class
文件
三、protobuf 语法
1、类型结构
以下是一个 Protobuf
数据类型和其他编程语言的数据类型的映射关系表。
Protobuf Type | 说明 | C++ Type | Java Type | Python Type[2] | Go Type |
---|---|---|---|---|---|
float | 固定4个字节 | float | float | float | float32 |
double | 固定8个字节 | double | double | float | float64 |
int32 | varint编码 | int32 | int | int | int32 |
uint32 | varint编码 | uint32 | int | int/long | uint32 |
uint64 | varint编码 | uint64 | long | int/long | uint64 |
sint32 | zigzag 和 varint编码 | int32 | int | int | int32 |
sint64 | zigzag 和 varint编码 | int64 | long | int/long | int64 |
fixed32 | 固定4个字节 | uint32 | int | int | uint32 |
fixed64 | 固定8个字节 | uint64 | long | int/long | uint64 |
sfixed32 | 固定4个字节 | int32 | int | int | int32 |
sfixed64 | 固定8个字节 | int64 | long | int/long | int64 |
bool | 固定一个字节 | bool | boolean | bool | bool |
string | Lenth-Delimited | uint64 | String | str/unicode | string |
bytes | Lenth-Delimited | string | ByteString | str | []byte |
bytes | Lenth-Delimited | string | ByteString | str | []byte |
message Test {
//数据类型 字段 field-number (还是用英文原文好一点)
int32 i32 = 1;
int64 i64 = 2;
uint32 u32 = 3;
uint64 u64 = 4;
sint32 si32 = 5;
sint64 si64 = 6;
fixed32 fx32 = 7;
fixed64 fx64 = 8;
sfixed32 sfx32 = 9;
sfixed64 sfx64 = 10;
bool bl = 11;
float f32 = 12;
double d64 = 13;
string str = 14;
bytes bs = 15;
repeated int32 vec = 16;
map<int32, int32> mp = 17;
SubTest test = 18;
oneof object {
float obj_f32 = 19;
string obj_str = 20;
}
google.protobuf.Any any = 21;
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
}
2、修饰符
有两种修饰符
singular
:标识这个字段有0个或1个值(缺省时的默认值)repeated
:标识这个字段有多个值,可理解为集合(java 中会解析为 List)
message Result{
string name = 1;
singular string name2 = 2;
repeated string nameList = 3;
}
3、默认值
解析消息时,如果编码的消息不包含特定的奇异元素,则解析对象中的相应字段将设置为该字段的默认值。这些默认值是特定于类型的:
- 对于字符串,默认值为空字符串。
- 对于字节,默认值为空字节。
- 对于布尔值,默认值为 false。
- 对于数字类型,默认值为零。
- 对于enums,默认值是第一个定义的 enum value,它必须是 0。
- 对于消息字段,未设置该字段。它的确切值取决于语言。有关详细信息,请参阅生成的代码指南。
重复字段的默认值为空(通常是相应语言的空列表)。
请注意,对于标量消息字段,一旦解析了消息,就无法判断该字段是显式设置为默认值(例如布尔值是否设置为false
)还是根本没有设置:您应该牢记这一点定义消息类型时。例如,false
如果您不希望在默认情况下也发生该行为,则不要在设置为时打开某些行为的布尔值。另请注意,如果标量消息字段设置为其默认值,则该值将不会在线上序列化。
4、枚举值
在定义消息类型时,您可能希望其字段之一仅具有预定义的值列表之一。例如,假设您要corpus
为每个 添加一个字段,SearchRequest
其中语料库可以是UNIVERSAL
、WEB
、IMAGES
、LOCAL
、NEWS
或。您可以通过在消息定义中添加一个非常简单的方法来做到这一点,并为每个可能的值添加一个常量。PRODUCTSVIDEOenum
在下面的示例中,我们添加了一个包含所有可能值的enum
调用Corpus
,以及一个 type 字段Corpus
:
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 result_per_page = 3;
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 4;
}
如您所见,Corpus
枚举的第一个常量映射到零:每个枚举定义都必须包含一个映射到零的常量作为其第一个元素。这是因为:
您可以通过将相同的值分配给不同的枚举常量来定义别名。为此,您需要将allow_alias
选项设置为true
,否则协议编译器将在找到别名时生成错误消息。
message MyMessage1 {
enum EnumAllowingAlias {
option allow_alias = true;
UNKNOWN = 0;
STARTED = 1;
RUNNING = 1;
}
}
message MyMessage2 {
enum EnumNotAllowingAlias {
UNKNOWN = 0;
STARTED = 1;
// RUNNING = 1; // Uncommenting this line will cause a compile error inside Google and a warning message outside.
}
}
5、分配字段编号
如您所见,消息定义中的每个字段都有一个唯一的编号(field-number)。这些字段编号用于在消息二进制格式中标识您的字段,并且在使用您的消息类型后不应更改。请注意,1 到 15 范围内的字段编号需要一个字节进行编码,包括字段编号和字段类型(您可以在Protocol Buffer Encoding中找到更多相关信息)。16 到 2047 范围内的字段编号占用两个字节。因此,您应该为非常频繁出现的消息元素保留数字 1 到 15。请记住为将来可能添加的频繁出现的元素留出一些空间。
您可以指定的最小字段编号是 1,最大的是 536,870,911。您也不能使用数字 19000 到 19999 ( FieldDescriptor::kFirstReservedNumber
through FieldDescriptor::kLastReservedNumber
),因为它们是为 Protocol Buffers 实现保留的——如果您在.proto
. 同样,您不能使用任何以前保留的字段编号。
四、proto文件示例
//指定了 proto3 的语法
syntax = "proto3";
//依赖的其他 proto 源文件,
//在依赖的数据类型在其他 proto 源文件中定义的情况下,
//需要通过 import 导入其他 proto 源文件
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
// proto生成java class是否是多个文件,如果是false,则以内部类方式整合到一个文件中
option java_multiple_files = true;
// 指定生成文件的 java 包
option java_package = "com.alibaba.nacos.api.grpc.auto";
// 指定生成最外层类的类名,不指定默认proto文件名下划线转驼峰
// 该类没啥用,只是java_multiple_files = false时用来管理内部类的
option java_outer_classname = "NacosGrpcService";
//message 是消息体,它就是一个结构体/类
message Metadata {
// 数据类型 字段名称 field-number
string type = 3;
string clientIp = 8;
map<string, string> headers = 7;
}
message Payload {
Metadata metadata = 2;
google.protobuf.Any body = 3;
}
// 定义服务,一个service可以定义多个 rpc 方法
service Request {
// 定义简单 rpc,客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
rpc request (Payload) returns (Payload) {}
}
service BiRequestStream {
// 定义双向流式 rpc
rpc requestBiStream (stream Payload) returns (stream Payload) {}
}
五、gRPC概念
官方文档:https://grpc.io/docs/what-is-grpc/
中文文档:http://doc.oschina.net/grpc?t=58008
gRPC是谷歌开源的高性能RPC框架。
gRPC设计的核心思路:
- 支持异构:gRPC自己封装了网络通信部分,提供了多种语言的网络通信封装【C Java(netty) GO…】
- 协议:采用HTTP2,HTTP2传输数据时采用二进制方式,支持双向流(双工)的多路复用
- 序列化:采用
protobuf
的序列化方式,是基于二进制的,时间空间效率是 JSON (基于文本)的 3-5 倍 - 代理的创建:让 调用者像调用本地方法一样去调用远程方法,称为 stub(存根)
在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。
1、概览
(1)服务定义
正如其他 RPC 系统,gRPC 基于如下思想:定义一个服务, 指定其可以被远程调用的方法及其参数和返回类型。gRPC 默认使用 protocol buffers 作为接口定义语言,来描述服务接口和有效载荷消息结构。如果有需要的话,可以使用其他替代方案。
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
required string greeting = 1;
}
message HelloResponse {
required string reply = 1;
}
gRPC 允许你定义四类服务方法:
- 单项 RPC,即客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。
rpc SayHello(HelloRequest) returns (HelloResponse){
}
- 服务端流式 RPC,即客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){
}
- 客户端流式 RPC,即客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {
}
- 双向流式 RPC,即两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。每个数据流里消息的顺序会被保持。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){
}
我们将在下面 RPC 生命周期章节里看到各类 RPC 的技术细节。
(2)使用 API 接口
gRPC 提供 protocol buffer 编译插件,能够从一个服务定义的 .proto 文件生成客户端和服务端代码。通常 gRPC 用户可以在服务端实现这些API,并从客户端调用它们。
- 在服务侧,服务端实现服务接口,运行一个 gRPC 服务器来处理客户端调用。gRPC 底层架构会解码传入的请求,执行服务方法,编码服务应答。
- 在客户侧,客户端有一个存根实现了服务端同样的方法。客户端可以在本地存根调用这些方法,用合适的 protocol buffer 消息类型封装这些参数— gRPC 来负责发送请求给服务端并返回服务端 protocol buffer 响应。
(3)同步 vs 异步
同步 RPC 调用一直会阻塞直到从服务端获得一个应答,这与 RPC 希望的抽象最为接近。另一方面网络内部是异步的,并且在许多场景下能够在不阻塞当前线程的情况下启动 RPC 是非常有用的。
在多数语言里,gRPC 编程接口同时支持同步和异步的特点。你可以从每个语言教程和参考文档里找到更多内容(很快就会有完整文档)。
2、RPC 生命周期
现在让我们来仔细了解一下当 gRPC 客户端调用 gRPC 服务端的方法时到底发生了什么。我们不究其实现细节,关于实现细节的部分,你可以在我们的特定语言页面里找到更为详尽的内容。
(1)单项/一元 RPC
首先我们来了解一下最简单的 RPC 形式:客户端发出单个请求,获得单个响应。
- 一旦客户端通过桩调用一个方法,服务端会得到相关通知 ,通知包括客户端的元数据,方法名,允许的响应期限(如果可以的话)
- 服务端既可以在任何响应之前直接发送回初始的元数据,也可以等待客户端的请求信息,到底哪个先发生,取决于具体的应用。
- 一旦服务端获得客户端的请求信息,就会做所需的任何工作来创建或组装对应的响应。如果成功的话,这个响应会和包含状态码以及可选的状态信息等状态明细及可选的追踪信息返回给客户端 。
- 假如状态是 OK 的话,客户端会得到应答,这将结束客户端的调用。
(2)服务端流式 RPC
服务端流式 RPC 除了在得到客户端请求信息后发送回一个应答流之外,与我们的简单例子一样。在发送完所有应答后,服务端的状态详情(状态码和可选的状态信息)和可选的跟踪元数据被发送回客户端,以此来完成服务端的工作。客户端在接收到所有服务端的应答后也完成了工作。
(3)客户端流式 RPC
客户端流式 RPC 也基本与我们的简单例子一样,区别在于客户端通过发送一个请求流给服务端,取代了原先发送的单个请求。服务端通常(但并不必须)会在接收到客户端所有的请求后发送回一个应答,其中附带有它的状态详情和可选的跟踪数据。
应用场景:物联网(IOT)传感器,比如车载gps,不断向服务器发送位置信息
(4)双向流式 RPC
双向流式 RPC ,调用由客户端调用方法来初始化,而服务端则接收到客户端的元数据,方法名和截止时间。服务端可以选择发送回它的初始元数据或等待客户端发送请求。 下一步怎样发展取决于应用,因为客户端和服务端能在任意顺序上读写 - 这些流的操作是完全独立的。例如服务端可以一直等直到它接收到所有客户端的消息才写应答,或者服务端和客户端可以像”乒乓球”一样:服务端后得到一个请求就回送一个应答,接着客户端根据应答来发送另一个请求,以此类推。
(5)截止时间
gRPC 允许客户端在调用一个远程方法前指定一个最后期限值。这个值指定了在客户端可以等待服务端多长时间来应答,超过这个时间值 RPC 将结束并返回DEADLINE_EXCEEDED
错误。在服务端可以查询这个期限值来看是否一个特定的方法已经过期,或者还剩多长时间来完成这个方法。 各语言来指定一个截止时间的方式是不同的 - 比如在 Python 里一个截止时间值总是必须的,但并不是所有语言都有一个默认的截止时间。
(6)RPC 终止
在 gRPC 里,客户端和服务端对调用成功的判断是独立的、本地的,他们的结论可能不一致。这意味着,比如你有一个 RPC 在服务端成功结束(“我已经返回了所有应答!”),到那时在客户端可能是失败的(“应答在最后期限后才来到!”)。也可能在客户端把所有请求发送完前,服务端却判断调用已经完成了。
(7)取消 RPC
无论客户端还是服务端均可以再任何时间取消一个 RPC 。一个取消会立即终止 RPC 这样可以避免更多操作被执行。它不是一个”撤销”, 在取消前已经完成的不会被回滚。当然,通过同步调用的 RPC 不能被取消,因为直到 RPC 结束前,程序控制权还没有交还给应用。
(8)元数据集
元数据是一个特殊 RPC 调用对应的信息(授权详情]) ,这些信息以键值对的形式存在,一般键的类型是字符串,值的类型一般也是字符串(当然也可以是二进制数据)。元数据对 gRPC 本事来说是不透明的 - 它让客户端提供调用相关的信息给服务端,反之亦然。 对于元数据的访问是语言相关的。
(9)流控制
TBD
(10)配置
TBD
(11)频道
在创建客户端存根时,一个 gRPC 频道提供一个特定主机和端口服务端的连接。客户端可以通过指定频道参数来修改 gRPC 的默认行为,比如打开关闭消息压缩。一个频道具有状态,包含已连接
和空闲
。 gRPC 如何处理关闭频道是语言相关的。有些语言可允许询问频道状态。
六、gRPC定义服务
要定义一个服务,你必须在你的 .proto 文件中指定 service
:
service RouteGuide {
...
}
然后在我们的服务中定义 rpc
方法,指定它们的请求的和响应类型。gRPC 允许你定义4种类型的 service 方法,这些都在 RouteGuide
服务中使用:
- 一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
- 一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入
stream
关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定
stream
关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加
stream
关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响 应类型——比如,下面的Point
消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
七、java
1、开发流程
1、
xxx-api
模块
定义 protobuf 的 IDL 语言,并通过命令创建具体代码,然后 server 、client 引入 api 模块
2、
xxx-server
模块
- 实现 api 模块中定义的服务接口
- 发布 gRPC 服务
3、
xxx-client
模块
创建服务端 stub (代理/存根)
基于 stub (代理)进行 RPC 调用
2、proto文件
该文件中定义了 4 种 rpc
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.rewind.grpc.helloworld";
option java_outer_classname = "HelloWorldService";
package helloworld;
// 简单 rpc
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse) {}
}
// 双向流式 rpc
service GreeterStream {
rpc SayHelloStream (stream HelloRequest) returns (stream HelloResponse) {}
}
// 服务端流式 rpc
service GreeterServerStream {
rpc SayHelloServerStream (HelloRequest) returns (stream HelloResponse) {}
}
// 客户端流式 rpc
service GreeterClientStream {
rpc SayHelloClientStream (stream HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
map<string, string> headers = 7;
}
message HelloResponse {
string message = 1;
}
生成文件一览
3、创建服务器
创建服务器有两个部分:
- 实现我们服务定义的生成的rpc服务接口:做我们的服务的实际的“工作”。
- 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。
服务端需要有个类继承上面 grpc-java
包下的 XXXGrpc.XXXImplBase
class GreeterAcceptor extends XXXGrpc.XXXImplBase {
// ...
}
(1)一元 rpc
先定义服务端逻辑代码
public class GreeterAcceptor extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
Map<String, String> header = request.getHeadersMap();
System.out.println("服务端执行方法:" + header);
// 封装响应对象
HelloResponse response = HelloResponse
.newBuilder()
.setMessage("Hello " + request.getName())
.build();
// 将 response 通过流返回客户端
responseObserver.onNext(response);
// 流处理完成通知,调用之后无法在对流进行操作
responseObserver.onCompleted();
}
}
重写父类方法 sayHello()
接收两个参数:
HelloRequest
:自定义请求实体类StreamObserver<HelloResponse>
: 一个应答的观察者,实际上是服务器调用它应答的一个特殊接口。
要将应答返回给客户端,并完成调用:
- 如在我们的服务定义中指定的那样,我们组织并填充一个
HelloResponse
应答对象返回给客户端。 - 我们使用应答观察者的
onNext()
方法返回HelloResponse
。 - 我们使用应答观察者的
onCompleted()
方法来指出我们已经完成了和 RPC的交互。
(2)服务端流式 rpc
/**
* 服务端流式 rpc
* 需要将多个 Response 发送回 客户端
* 客户端接收到 Iterator 类型的数据
*/
public class GreeterServerStreamAcceptor extends GreeterServerStreamGrpc.GreeterServerStreamImplBase {
@Override
public void sayHelloServerStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
System.out.println(request.getHeadersMap());
System.out.println(request.getName());
for (int i = 0; i < 10; i++) {
HelloResponse response = HelloResponse.newBuilder().setMessage("消息id" + i).build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
}
}
(3)客户端流式 rpc
/**
* 客户端流式 rpc
* 接收客户端传过来的流
*/
public class GreeterClientStreamAccepter extends GreeterClientStreamGrpc.GreeterClientStreamImplBase {
@Override
public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloResponse> responseObserver) {
return new StreamObserver<HelloRequest>() {
int receiveCount;
/**
* 客户端向流发送一次数据时调用
*/
@Override
public void onNext(HelloRequest request) {
receiveCount++;
System.out.println("接收到客户端传来的一条数据:" + request.getName());
}
/**
* 发生异常后调用
*/
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
/**
* 客户端结束写入消息时调用
*/
@Override
public void onCompleted() {
responseObserver.onNext(HelloResponse.newBuilder().setMessage("收到").build());
responseObserver.onCompleted();
System.out.println("服务端消息结束接收");
}
};
}
}
我们的方法和前面的方法类型相似,拿到一个 StreamObserver
应答观察者参数,但是这次它返回一个 StreamObserver
以便客户端写入它的 request
。
在这个方法体中,我们返回了一个匿名 StreamObserver
实例,其中我们:
- 覆写了
onNext()
方法,每次客户端写入一个request
到消息流时,拿到特性和其它信息。 - 覆写了
onCompleted()
方法(在 客户端 结束写入消息时调用),用来填充和构建我们的response
。然后我们用response
调用方法自己的的响应观察者的onNext()
,之后调用它的onCompleted()
方法,结束服务器端的调用。
(4)双向流式 rpc
public class GreeterStreamAcceptor extends GreeterStreamGrpc.GreeterStreamImplBase {
@Override
public StreamObserver<HelloRequest> sayHelloStream(StreamObserver<HelloResponse> responseObserver) {
return new StreamObserver<HelloRequest>() {
/**
* 接收到客户端数据时调用
*/
@Override
public void onNext(HelloRequest request) {
HelloResponse response = HelloResponse.newBuilder().setMessage("收到:" + request.getName()).build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
}
/**
* 客户端请求流结束时调用
*/
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver
应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。
(5)服务器启动
public class HelloWorldServer {
private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());
private Server server;
private void start(int port) throws IOException {
// grpc 服务端启动
server = ServerBuilder.forPort(port)
// 指定使用的线程池
//.executor(getRpcExecutor())
.addService(new GreeterAcceptor())
.build()
.start();
logger.info("Server started, listening on " + port);
// JVM 关闭回调,JVM 关闭时,将 grpc 也关闭
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
HelloWorldServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* 在主线程上等待终止,因为grpc库使用守护线程。
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer server = new HelloWorldServer();
server.start(50051);
// 主线程阻塞等待,因为grpc为守护线程,主线程停止grpc线程也结束
server.blockUntilShutdown();
}
}
实际上启动代码就下面这段
Server server = ServerBuilder
.forPort(port) // 设置监听端口
.addService(new GreeterAcceptor()) // 设置 rpc 逻辑
.build()
.start(); // 启动 grpc
4、创建客户端
(1)创建客户端stub
为了调用服务方法,我们需要首先创建一个 存根,或者两个存根:
- 一个 阻塞/同步 存根:这意味着 RPC 调用等待服务器响应,并且要么返回应答,要么造成异常。
- 一个 非阻塞/异步 存根可以向服务器发起非阻塞调用,应答会异步返回。你可以使用异步存根去发起特定类型的流式调用。
我们首先为存根创建一个 gRPC channel,指明服务器地址和我们想连接的端口号:
channel = NettyChannelBuilder.forAddress(host, port)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
如你所见,我们用一个 NettyServerBuilder
构建和启动服务器。这个服务器的生成器基于 Netty 传输框架。
我们使用 Netty 传输框架,所以我们用一个 NettyServerBuilder
启动服务器。
现在我们可以通过从 .proto 中生成的 RouteGuideGrpc
类的 newStub
和 newBlockingStub
方法,使用频道去创建我们的存根。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
完整代码如下
public class HelloWorldClient {
private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());
private final GreeterGrpc.GreeterBlockingStub blockingStub;
// 初始化 blockingStub ,应保证只在程序启动的时候初始化一次
public HelloWorldClient(String target) {
// 创建客户端到服务器的通信通道,称为channel。
// 通道是线程安全且可重用的。通常在应用程序开始时创建通道,并在应用程序关闭之前重用它们。
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
// 默认情况下通道是安全的(通过SSL/TLS)。对于本例,我们禁用TLS以避免需要证书。
.usePlaintext()
.build();
blockingStub = GreeterGrpc.newBlockingStub(channel);
// JVM 关闭回调,JVM 关闭时,将 grpc 也关闭
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// ManagedChannels使用线程和TCP连接等资源。
// 为了防止泄漏这些资源,应该在通道不再使用时关闭它。如果可能再次使用,请保持运行状态。
try {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
/**
* 简单rpc调用
*/
public void greet(String name) {
HashMap<String, String> map = new HashMap<>();
map.put("token", "123456");
HelloRequest request = HelloRequest.newBuilder()
.setName(name)
.putAllHeaders(map)
.build();
HelloResponse response;
try {
// 调用服务端方法
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
e.printStackTrace();
return;
}
logger.info("Greeting: " + response.getMessage());
}
public static void main(String[] args) throws Exception {
String userName = "world";
String target = "localhost:50051";
HelloWorldClient client = new HelloWorldClient(target);
client.greet(userName);
}
}
(2)一元 rpc
GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);
// 简单 rpc 调用
HelloResponse response = blockingStub.sayHello(request);
(3)服务端流式 rpc
GreeterServerStreamGrpc.GreeterServerStreamBlockingStub greeterServerStreamBlockingStub = GreeterServerStreamGrpc.newBlockingStub(channel);
// 服务端流式 rpc
Iterator<HelloResponse> responseIterator = stub.sayHelloServerStream(request);
// 会阻塞等待服务端返回结果
while (responseIterator.hasNext()) {
System.out.println(responseIterator.next().getMessage());
}
(4)客户端流式 rpc
// 客户端流式 rpc, 注意调用的是 newStub
GreeterClientStreamGrpc.GreeterClientStreamStub greeterClientStreamStub = GreeterClientStreamGrpc.newStub(channel);
public void clientStreamRpc(){
final SettableFuture<String> finishFuture = SettableFuture.create();
StreamObserver<HelloResponse> responseStreamObserver = new StreamObserver<HelloResponse>() {
/**
* 接收到服务发送的数据时调用
*/
@Override
public void onNext(HelloResponse response) {
System.out.println(response);
}
/**
* 发生异常时调用
*/
@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}
/**
* 流结束时调用
*/
@Override
public void onCompleted() {
System.out.println("流结束");
finishFuture.set("end");
}
};
StreamObserver<HelloRequest> requestStreamObserver = greeterClientStreamStub.sayHelloClientStream(responseStreamObserver);
for (int i = 0; i < 10; i++) {
HelloRequest request = HelloRequest.newBuilder().setName("name" + i).build();
requestStreamObserver.onNext(request);
}
requestStreamObserver.onCompleted();
try {
// 阻塞等待
System.out.println("结束" + finishFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
为了调用这个方法我们需要创建一个 StreamObserver
,它为了服务器用它的 RouteSummary
应答实现了一个特殊的接口。在 StreamObserver
中,我们:
- 覆写了
onNext()
方法,在服务器把response
写入到消息流时,打印出返回的信息。 - 覆写了
onCompleted()
方法(在 服务器 完成自己的调用时调用)去设置SettableFuture
,这样我们可以检查服务器是不是完成写入。
之后,我们将 StreamObserver
传给异步存根的 sayHelloClientStream()
方法,拿到我们自己的 StreamObserver
请求观察者将 request
发给服务器。一旦完成点的写入,我们使用请求观察者的 onCompleted()
方法告诉 gRPC 我们已经完成了客户端的写入。一旦完成,我们就检查 SettableFuture
验证服务器是否已经完成写入。
(5)双向流式 rpc
GreeterStreamGrpc.GreeterStreamStub greeterStreamStub = GreeterStreamGrpc.newStub(channel);
/**
* 双向流式 rpc
*/
public void streamRpc(){
final SettableFuture<String> finishFuture = SettableFuture.create();
StreamObserver<HelloResponse> responseStreamObserver = new StreamObserver<HelloResponse>() {
/**
* 接收到服务发送的数据时调用
*/
@Override
public void onNext(HelloResponse response) {
System.out.println(response.getMessage());
}
/**
* 发生异常时调用
*/
@Override
public void onError(Throwable t) {
finishFuture.setException(t);
}
/**
* 流结束时调用
*/
@Override
public void onCompleted() {
System.out.println("流结束");
finishFuture.set("end");
}
};
StreamObserver<HelloRequest> requestStreamObserver = greeterStreamStub.sayHelloStream(responseStreamObserver);
for (int i = 0; i < 10; i++) {
HelloRequest request = HelloRequest.newBuilder().setName("id" + i).build();
requestStreamObserver.onNext(request);
}
requestStreamObserver.onCompleted();
try {
System.out.println(finishFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
和我们的客户端流的例子一样,我们拿到和返回一个 StreamObserver
应答观察者,除了这次我们在客户端仍然写入消息到 它们的 消息流时通过我们方法的应答观察者返回值。这里读写的语法和客户端流以及服务器流方法一样。虽然每一端都会按照它们写入的顺序拿到另一端的消息,客户端和服务器都可以任意顺序读写——流的操作是互不依赖的。
5、StreamObserver API
(1)onNext(V value)
向流设置一个值。
可以多次调用,但在 onError(Throwable)
或 onCompleted()
被调用后不能再次调用。
非流式调用最多调用 onNext
一次。对于服务器流调用,客户端最多只能调用 onNext
一次,但可能会收到多次 onNext
回调。对于客户端流调用,服务器可能最多调用 onNext
一次,但可能会收到多次 onNext
回调。
如果实现抛出异常,则调用者应在传播捕获的异常之前通过调用 onError(Throwable)
来终止流。
(2)onCompleted()
接收流成功完成的通知。
只能被调用一次,如果被调用,它必须是最后一个被调用的方法。特别是,如果onCompleted的实现抛出异常,则不允许对任何方法进行进一步调用。
(3)onError(Throwable t)
从流接收终止错误。
只能被调用一次,如果被调用,它必须是最后一个被调用的方法。特别是,如果onError的实现抛出异常,则不允许进一步调用任何方法。
6、三种客户端代理
(1)newBlockingStub
顾名思义,这个是阻塞调用的gRPC客户端类型,实际使用中跟HTTP接口请求->响应
一样。可以支持一元和服务端流式调用。
(2)newStub
纯异步调用客户端。可以支持所有的服务调用
(3)newFutureStub
这种客户端也是异步的,它具有同步客户端的属性,在实际使用中,既可以当做异步客户端使用也可以当做一个同步的客户端使用。
7、服务端拦截器
请求被具体的Handler相应前。
使用场景
a)访问认证
b)请求日志记录及监控
c)代理转发
(1)基本使用
public class MyGrpcServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
}
参数:
call:ServerCall 对象,包含客户端请求的 MethodDescriptor
headers:请求头信息
next:处理链条上的下一个处理。
(2)扩展使用
public class MyGrpcServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
//提取认证信息
String id = headers.get(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER));
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
private long startTime = 0; //处理开始时间
private ReqT request;
private boolean valid = false; //认证状态
@Override
public void onComplete() {
//记录请求参数及耗时
System.out.println("process cost: " + (System.nanoTime() - startTime));
System.out.println("process param: " + request.toString());
super.onComplete();
}
@Override
public void onMessage(ReqT message) {
startTime = System.nanoTime();
request = message;
if (StringUtils.equals("king", id)) {
super.onMessage(message);
} else {
valid = false;
}
}
@Override
public void onHalfClose() {
//验证失败则返回 Status.UNAUTHENTICATED
if (!valid) {
call.close(Status.UNAUTHENTICATED.withDescription("auth failed"), new Metadata());
} else {
super.onHalfClose();
}
}
};
}
}
onMessage
:接收到请求时进行相应处理,我们这记录处理开始时间,及请求参数,同时根据提取的认证信息进行访问验证,验证通过则继续后续处理,否则设置认证状态为 false。
onHalfClose
:处理认证标示及返回。
onComplete
:处理结束记录请求参数及耗时。
8、客户端拦截器
作用时机:请求被分发出去之前。
使用场景
a)、请求日志记录及监控
b)、添加请求头数据、以便代理转发使用
c)、请求或者结果重写
通常,如果要提供认证信息的话,可以使用 CallCredentials
实现(见下面的 JWT安全认证),虽然,拦截器里也可以通过设置 CallOptions
来提供。
(1)简单案例
实现ClientInterceptor并重写interceptCall
public class MyGrpcClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return next.newCall(method, callOptions);
}
}
它只有一个方法:interceptCall,对于注册了相应拦截器的客户端调用,都要经过这个方法,
参数:
1、method:MethodDescriptor 类型,标示请求方法。包括方法全限定名称、请求服务名称、请求、结果、序列化工具、幂等等。
2、callOptions:此次请求的附带信息。
3、next:执行此次 RPC 请求的抽象链接管道(Channel)
返回结果:
ClientCall,包含请求及结果信息,并且不为null。
上面的例子中,没有实现任何逻辑,直接执行了 next.newCall
继续执行客户端的此次调用。
next.newCall
只能在当前上下文中执行,每次调用以及返回都必须是一个完整地回路,逃逸使用会导致不必要的内存泄漏问题。
客户端添加设置拦截器
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 50051)
// 添加拦截器
.intercept(new MyGrpcClientInterceptor())
.usePlaintext()
.build();
(2)超时\认证\日志记录
public class MyGrpcClientInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
CallOptions myCallOptions = callOptions
.withDeadlineAfter(500, TimeUnit.MILLISECONDS) //设置超时
.withCallCredentials(new CallCredentials() { //设置认证信息
@Override
public void applyRequestMetadata(RequestInfo requestInfo, Executor appExecutor, MetadataApplier applier) {
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER), "king");
applier.apply(metadata);
}
@Override
public void thisUsesUnstableApi() {}
});
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, myCallOptions)) {
@Override
public void sendMessage(ReqT message) {
System.out.println("request method: " + method.getFullMethodName());
System.out.println("request param:" + message.toString());
super.sendMessage(message);
}
};
}
}
ForwardingClientCall:ClientCall 的一个抽象实现类,用以请求的代理转发。为什么我们这里要用这个类呢?
其实我们完全可以直接使用 ClientCall 实现,只不过作为顶级抽象类,我们必须要实现很多方法。而使用 ForwardingClientCall,则我们只需要去重写我们需要的方法就可以。
如上代码:sendMessage 发送消息到请求服务器,可能会执行多次。此处我们记录相应的请求参数信息。
9、Jwt安全认证
(1)服务端拦截器
public class JwtServerInterceptor implements ServerInterceptor {
static final Metadata.Key<String> AUTHORIZATION_METADATA_KEY = Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
static final Context.Key<String> CLIENT_ID_CONTEXT_KEY = Context.key("clientId");
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,Metadata headers,ServerCallHandler<ReqT, RespT> next) {
Status status = Status.OK;
String token = headers.get(AUTHORIZATION_METADATA_KEY);
if (StringUtils.isEmpty(token)){
status = Status.UNAUTHENTICATED.withDescription("Authorization token is missing");
} else{
// 解析token
// ...
// 将客户端id设置为当前上下文
Context ctx = Context.current()
.withValue(CLIENT_ID_CONTEXT_KEY, "token解析出来的信息");
return Contexts.interceptCall(ctx, call, headers, next);
}
call.close(status, new Metadata());
return new ServerCall.Listener<ReqT>() {
// noop
};
}
}
(2)启动服务端
public class JwtGrpcServer {
public static void main(String[] args) throws IOException, InterruptedException {
Server server = Grpc.newServerBuilderForPort(50051, InsecureServerCredentials.create())
.addService(new GreeterAcceptor())
.intercept(new JwtServerInterceptor())
.build();
server.start();
server.awaitTermination();
}
}
(3)客戶端凭据拦截器
public class JwtCredential extends CallCredentials {
@Override
public void applyRequestMetadata(RequestInfo requestInfo, Executor executor, MetadataApplier applier) {
String token = "自定义token";
executor.execute(new Runnable() {
@Override
public void run() {
try {
Metadata headers = new Metadata();
headers.put(AUTHORIZATION_METADATA_KEY,
String.format("%s %s", "Bearer", token));
applier.apply(headers);
} catch (Throwable e) {
applier.fail(Status.UNAUTHENTICATED.withCause(e));
}
}
});
}
@Override
public void thisUsesUnstableApi() {
}
}
(4)启动客户端
public class JwtGrpcClient {
public static void main(String[] args) {
// 创建客户端到服务器的通信通道,称为channel。
// 通道是线程安全且可重用的。通常在应用程序开始时创建通道,并在应用程序关闭之前重用它们。
ManagedChannel channel = ManagedChannelBuilder
//.forTarget(target)
.forAddress("localhost", 50051)
// 默认情况下通道是安全的(通过SSL/TLS)。对于本例,我们禁用TLS以避免需要证书。
.usePlaintext()
.build();
// 简单 rpc
GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);
HelloRequest request = HelloRequest.newBuilder().setName("rewind").build();
// 请求并添加凭据
HelloResponse response = blockingStub.withCallCredentials(new JwtCredential()).sayHello(request);
System.out.println(response.getMessage());
}
}