getter.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package main
  2. import "surdeus.su/core/amo"
  3. import "surdeus.su/core/ss/urlenc"
  4. import "surdeus.su/core/cli/mtool"
  5. //import "fmt"
  6. import "log"
  7. import "time"
  8. import "os"
  9. import "os/signal"
  10. import "encoding/json"
  11. type Getter[V any] interface {
  12. GetValues(*amo.Client, ...urlenc.Builder) ([]V, amo.NextFunc[[]V], error)
  13. GetNameMul() string
  14. GetFuncName() string
  15. }
  16. func RunGetter[V any, G Getter[V]](g G, flags *mtool.Flags) {
  17. var (
  18. opts DefaultFlags
  19. )
  20. now := time.Now()
  21. MakeDefaultFlags(&opts, flags)
  22. MakeGetterFlags(&opts, flags)
  23. idStrs := flags.Parse()
  24. c, err := amo.NewClient(opts.SecretPath)
  25. if err != nil {
  26. log.Fatalf("NewAmoClient(...): %s\n", err)
  27. }
  28. c.API.SetMRPS(opts.MRPS)
  29. enc := json.NewEncoder(os.Stdout)
  30. if opts.Indent {
  31. enc.SetIndent("", " ")
  32. }
  33. inter := make(chan os.Signal, 1)
  34. signal.Notify(inter, os.Interrupt)
  35. go func() {
  36. <-inter
  37. os.Stdout.Sync()
  38. os.Exit(0)
  39. }()
  40. finalNum := 0
  41. if opts.All {
  42. page := opts.StartPage
  43. values, next, err := g.GetValues(
  44. c,
  45. urlenc.Value[int]{"page", page},
  46. urlenc.Value[int]{"limit", opts.MEPR},
  47. )
  48. if err != nil {
  49. log.Fatalf(
  50. "%s(...): %s\n", g.GetFuncName(), err,
  51. )
  52. }
  53. finalNum += len(values)
  54. if opts.Verbose {
  55. log.Printf("Got %d %s (%d, page %d)\n",
  56. len(values), g.GetNameMul(), finalNum, page)
  57. }
  58. for _, value := range values {
  59. err := enc.Encode(value)
  60. if err != nil {
  61. log.Fatalf("json.Encode(...): %s\n", err)
  62. }
  63. }
  64. page++
  65. for page <= opts.EndPage && next != nil {
  66. values, next, err = next()
  67. if err != nil {
  68. log.Fatalf(
  69. "%s(...): %s\n",
  70. g.GetFuncName(), err,
  71. )
  72. }
  73. finalNum += len(values)
  74. if opts.Verbose {
  75. log.Printf("Got %d %s (%d, page %d)\n",
  76. len(values), g.GetNameMul(),
  77. finalNum, page)
  78. }
  79. for _, value := range values {
  80. err = enc.Encode(value)
  81. if err != nil {
  82. log.Fatalf("json.Encode(...): %s\n", err)
  83. }
  84. }
  85. page++
  86. }
  87. }
  88. ids := ReadIDs(idStrs)
  89. if len(ids) == 0 {
  90. log.Fatalf("Got no IDs to read %s", g.GetNameMul())
  91. return
  92. }
  93. valueChan := make(chan []V)
  94. finish := RunForSliceInThreads[int](
  95. opts.Threads, opts.MEPR,
  96. ids, func(thread int, s []int){
  97. values, _, err := g.GetValues(
  98. c,
  99. urlenc.Array[int]{
  100. "id",
  101. s,
  102. },
  103. urlenc.Value[string]{
  104. "with",
  105. "contacts",
  106. },
  107. urlenc.Value[int]{
  108. "limit",
  109. opts.MEPR,
  110. },
  111. )
  112. if err != nil {
  113. log.Printf("thread(%d): %s(...): %s\n",
  114. thread, g.GetFuncName(), err)
  115. return
  116. }
  117. valueChan <- values
  118. if opts.Verbose {
  119. log.Printf(
  120. "thread(%d): Got %d %s\n",
  121. thread, len(values),
  122. g.GetNameMul(),
  123. )
  124. }
  125. },
  126. )
  127. //var wg sync.WaitGroup
  128. go func(){
  129. // Waiting for appending so we do not lose data.
  130. <-finish
  131. for len(valueChan) > 0 {}
  132. close(valueChan)
  133. }()
  134. for values := range valueChan {
  135. finalNum += len(values)
  136. for _, value := range values {
  137. err := enc.Encode(value)
  138. if err != nil {
  139. log.Fatalf("json.Encode(...): %s\n", err)
  140. }
  141. }
  142. if opts.Verbose {
  143. log.Printf(
  144. "thtread(main) %.2f%%, sument=%d",
  145. float32(finalNum)/float32(len(ids))*100.,
  146. finalNum,
  147. )
  148. }
  149. }
  150. if opts.Verbose {
  151. rm := c.API.RequestsMade()
  152. log.Printf(
  153. "Summarized got %d %s\n",
  154. finalNum, g.GetNameMul(),
  155. )
  156. log.Printf(
  157. "Made %d requests in process\n",
  158. rm,
  159. )
  160. took := time.Since(now).Seconds()
  161. log.Printf(
  162. "Took %f seconds\n",
  163. took,
  164. )
  165. log.Printf("RPS = %f\n", float64(rm)/took)
  166. }
  167. }