之前我们讲了 grpc 怎么简单的使用 ,这次讲讲 grpc 中的 stream,stream 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

grpc的四种模式.pptx

proto

  1. syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
  2. //声明 包名
  3. option go_package=".;proto";
  4. //声明grpc服务
  5. service Greeter {
  6. /*
  7. 以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
  8. */
  9. rpc GetStream (StreamReqData) returns (stream StreamResData){}
  10. rpc PutStream (stream StreamReqData) returns (StreamResData){}
  11. rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
  12. }
  13. //stream请求结构
  14. message StreamReqData {
  15. string data = 1;
  16. }
  17. //stream返回结构
  18. message StreamResData {
  19. string data = 1;
  20. }

服务端

  1. package main
  2. import (
  3. "fmt"
  4. "google.golang.org/grpc"
  5. "log"
  6. "net"
  7. "start/new_stream/proto"
  8. "sync"
  9. "time"
  10. )
  11. const PORT = ":50052"
  12. type server struct {
  13. }
  14. //服务端 单向流
  15. func (s *server)GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error{
  16. i:= 0
  17. for{
  18. i++
  19. res.Send(&proto.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
  20. time.Sleep(1*time.Second)
  21. if i >10 {
  22. break
  23. }
  24. }
  25. return nil
  26. }
  27. //客户端 单向流
  28. func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
  29. for {
  30. if tem, err := cliStr.Recv(); err == nil {
  31. log.Println(tem)
  32. } else {
  33. log.Println("break, err :", err)
  34. break
  35. }
  36. }
  37. return nil
  38. }
  39. //客户端服务端 双向流
  40. func(s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
  41. wg := sync.WaitGroup{}
  42. wg.Add(2)
  43. go func() {
  44. for {
  45. data, _ := allStr.Recv()
  46. log.Println(data)
  47. }
  48. wg.Done()
  49. }()
  50. go func() {
  51. for {
  52. allStr.Send(&proto.StreamResData{Data:"ssss"})
  53. time.Sleep(time.Second)
  54. }
  55. wg.Done()
  56. }()
  57. wg.Wait()
  58. return nil
  59. }
  60. func main(){
  61. //监听端口
  62. lis,err := net.Listen("tcp",PORT)
  63. if err != nil{
  64. panic(err)
  65. return
  66. }
  67. //创建一个grpc 服务器
  68. s := grpc.NewServer()
  69. //注册事件
  70. proto.RegisterGreeterServer(s,&server{})
  71. //处理链接
  72. err = s.Serve(lis)
  73. if err != nil {
  74. panic(err)
  75. }
  76. }

客户端

  1. package main
  2. import (
  3. "google.golang.org/grpc"
  4. "context"
  5. _ "google.golang.org/grpc/balancer/grpclb"
  6. "log"
  7. "start/new_stream/proto"
  8. "time"
  9. )
  10. const (
  11. ADDRESS = "localhost:50052"
  12. )
  13. func main(){
  14. //通过grpc 库 建立一个连接
  15. conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
  16. if err != nil{
  17. return
  18. }
  19. defer conn.Close()
  20. //通过刚刚的连接 生成一个client对象。
  21. c := proto.NewGreeterClient(conn)
  22. //调用服务端推送流
  23. reqstreamData := &proto.StreamReqData{Data:"aaa"}
  24. res,_ := c.GetStream(context.Background(),reqstreamData)
  25. for {
  26. aa,err := res.Recv()
  27. if err != nil {
  28. log.Println(err)
  29. break
  30. }
  31. log.Println(aa)
  32. }
  33. //客户端 推送 流
  34. putRes, _ := c.PutStream(context.Background())
  35. i := 1
  36. for {
  37. i++
  38. putRes.Send(&proto.StreamReqData{Data:"ss"})
  39. time.Sleep(time.Second)
  40. if i > 10 {
  41. break
  42. }
  43. }
  44. //服务端 客户端 双向流
  45. allStr,_ := c.AllStream(context.Background())
  46. go func() {
  47. for {
  48. data,_ := allStr.Recv()
  49. log.Println(data)
  50. }
  51. }()
  52. go func() {
  53. for {
  54. allStr.Send(&proto.StreamReqData{Data:"ssss"})
  55. time.Sleep(time.Second)
  56. }
  57. }()
  58. select {
  59. }
  60. }