随着PR https://github.com/apache/incubator-seata/pull/6754 的合并,Seata Server能够做到识别并处理Grpc请求,这意味着任意语言客户端,只需要引入proto文件,就可以和部署在JVM上的Seata Server通信,进而实现分布式事务的全流程。
下面以Go语言为例,向大家演示这一过程。
环境准备
Goland 2024.2
Idea 2024.3
jdk 1.8
go 1.23.3
Seata 2.3.0-SNAPSHOT
libprotoc 3.21.0
操作过程
部署并启动 Seata Server
运行 org.apache.seata.server.ServerApplication#main,如下所示
proto文件导入
在go项目中导入完成本次事务流程所需的proto文件,包括各类事务请求和响应的proto文件和发起RPC的proto文件。如下所示
grpc相关文件生成
在上一步导入的proto文件目录下,执行命令
protoc --go_out=. --go-grpc_out=. .\*.proto
执行完后会生成grpc代码,如下所示
grpc调用
在main.go中完成一次分布式事务的流程,并打印Seata Server的响应,代码如下所示
func main() {
conn, err := grpc.Dial(":8091", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
client := pb.NewSeataServiceClient(conn)
stream, err := client.SendRequest(context.Background())
if err != nil {
log.Fatalf("could not sendRequest: %v", err)
}
defer stream.CloseSend()
sendRegisterTm(stream)
xid := sendGlobalBegin(stream)
sendBranchRegister(stream, xid)
sendGlobalCommit(stream, xid)
}
func sendRegisterTm(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto]) {
abstractIdentifyRequestProto := &pb.AbstractIdentifyRequestProto{
ApplicationId: "test-applicationId",
}
registerTMRequestProto := &pb.RegisterTMRequestProto{
AbstractIdentifyRequest: abstractIdentifyRequestProto,
}
registerTMResponseProto := &pb.RegisterTMResponseProto{}
sendMessage(stream, registerTMRequestProto, registerTMResponseProto)
}
func sendGlobalBegin(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto]) string {
globalBeginRequestProto := &pb.GlobalBeginRequestProto{
TransactionName: "test-transactionName",
Timeout: 200,
}
globalBeginResponseProto := &pb.GlobalBeginResponseProto{}
sendMessage(stream, globalBeginRequestProto, globalBeginResponseProto)
return globalBeginResponseProto.Xid
}
func sendBranchRegister(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto], xid string) {
branchRegisterRequestProto := &pb.BranchRegisterRequestProto{
Xid: xid,
LockKey: "1",
ResourceId: "test-resourceId",
BranchType: pb.BranchTypeProto_AT,
ApplicationData: "{\"mock\":\"mock\"}",
}
branchRegisterResponseProto := &pb.BranchRegisterResponseProto{}
sendMessage(stream, branchRegisterRequestProto, branchRegisterResponseProto)
}
func sendGlobalCommit(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto], xid string) {
abstractGlobalEndRequestProto := &pb.AbstractGlobalEndRequestProto{
Xid: xid,
}
globalCommitRequestProto := &pb.GlobalCommitRequestProto{
AbstractGlobalEndRequest: abstractGlobalEndRequestProto,
}
globalCommitResponseProto := &pb.GlobalCommitResponseProto{}
sendMessage(stream, globalCommitRequestProto, globalCommitResponseProto)
}
func sendMessage(stream grpc.BidiStreamingClient[pb.GrpcMessageProto, pb.GrpcMessageProto], req proto.Message, response proto.Message) {
anyMsg, err := anypb.New(req)
if err != nil {
log.Fatalf("could not new any msg: %v", err)
}
marshal, err := proto.Marshal(anyMsg)
msg := &pb.GrpcMessageProto{
HeadMap: map[string]string{},
Body: marshal,
}
err = stream.Send(msg)
if err != nil {
log.Fatalf("could not send msg: %v", err)
}
resp, err := stream.Recv()
if err != nil {
log.Fatalf("failed to receive message: %v", err)
}
body := resp.Body
var anyMessage anypb.Any
err = proto.Unmarshal(body, &anyMessage)
if err != nil {
log.Fatalf("failed to unmarshal to any: %v", err)
}
err = anypb.UnmarshalTo(&anyMessage, response, proto.UnmarshalOptions{})
if err != nil {
log.Fatalf("failed to unmarshal to message: %v", err)
}
log.Printf("Received: %+v", response)
}