feat: reimplementing the API.

This commit is contained in:
Andrey Parhomenko 2024-06-05 01:16:12 +05:00
parent 546600a027
commit baf3d06c3e
9 changed files with 384 additions and 72 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@
secret.json
*.json
/exe/
/tmp/

View file

@ -14,10 +14,7 @@ type Contact = contacts.Contact
type Event = events.Event
const (
// Maximum entities for once.
// It is set to be the most effective
// but in fact can be greater.
MaxEnt = 50
MEPR = api.MaxEntitiesPerRequest
)
type Client struct {

194
cmd/amocli/common.go Normal file
View file

@ -0,0 +1,194 @@
package main
import "surdeus.su/core/cli/mtool"
import "surdeus.su/core/amo"
import "sync"
import "os"
import "strconv"
import "log"
import "bufio"
import "math"
type DefaultFlags struct {
SecretPath string
Threads int
Verbose bool
MRPS int
MEPR int
All bool
StartPage int
EndPage int
}
func MakeDefaultFlags(opts *DefaultFlags, flags *mtool.Flags) {
flags.StringVar(
&opts.SecretPath,
"secret",
"secret.json",
"path to JSON file with AMO CRM secrets",
"AMO_SECRET",
)
flags.IntVar(
&opts.MRPS,
"mrps",
9,
"maximum requests per second",
)
flags.IntVar(
&opts.Threads,
"threads",
5,
"amount of threads to run the requests",
)
flags.BoolVar(
&opts.Verbose,
"no-verbose",
true,
"disable verbose mode",
)
flags.IntVar(
&opts.MEPR,
"mepr",
amo.MEPR,
"max entities per request",
)
}
func MakeGetterFlags(
opts *DefaultFlags,
flags *mtool.Flags,
) {
flags.BoolVar(
&opts.All,
"all",
false,
"get all leads",
)
flags.IntVar(
&opts.StartPage,
"startpage",
1,
"the page to start at (works only with -all)",
)
flags.IntVar(
&opts.EndPage,
"endpage",
math.MaxInt,
"the page to end the requests",
)
}
// Run function for slice's parts in different threads.
// Returns a channel that ticks that the threads finished.
func RunForSliceInThreads[V any](
threadN, mepr int, // Thread amount and MERP.
slice []V,
// The function that takes
// the thread number and the slice of slice.
fn func(int, []V),
) (chan struct{}) {
ret := make(chan struct{})
// No need for threads if
// there are so few entities.
if len(slice) <= mepr {
go func() {
fn(1, slice)
ret <- struct{}{}
}()
return ret
}
var wg sync.WaitGroup
runFn :=func(thread int, s []V) {
defer wg.Done()
iterN := (len(s) / mepr) + 1
for j := 0 ; j<iterN ; j++ {
start := j*mepr
end := start + mepr
if end > len(s) {
end = len(s)
}
if len(s[start:end]) == 0 {
continue
}
fn(thread, s[start:end])
}
}
// Maximizing speed on small data.
//threadSize := len(slice) / threadN
threadMeprRest := len(slice) % mepr
//threadSize := (len(slice)-threadMeprRest)/threadN
preThreadSize :=
((len(slice)-threadMeprRest)/threadN)
threadSize := (preThreadSize/mepr+1)*mepr
if threadSize < mepr {
threadSize = mepr
threadN = len(slice) / mepr
runFn = func(thread int, s []V) {
defer wg.Done()
fn(thread, s)
}
}
for i := 0 ; i<threadN ; i++ {
first := i * threadSize
last := first + threadSize
if last > len(slice) {
last = len(slice)
}
// Got an empty slice.
if len(slice[first:last]) == 0 {
break
}
wg.Add(1)
go runFn(i+1, slice[first:last])
}
go func() {
wg.Wait()
ret <- struct{}{}
}()
return ret
}
func ReadIDs(idStrs []string, flags *mtool.Flags) []int {
var ids []int
if len(idStrs) > 0 {
ids = make([]int, 0, len(idStrs))
for _, idStr := range idStrs {
id, err := strconv.Atoi(idStr)
if err != nil {
log.Printf("Error: Atoi(%q): %s\n", err)
continue
}
ids = append(ids, id)
}
} else {
ids = make([]int, 0)
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
txt := scanner.Text()
id, err := strconv.Atoi(txt)
if err != nil {
log.Printf(
"strconv.Atoi(%q): %s\n",
txt, err,
)
continue
}
ids = append(ids, id)
}
}
return ids
}

View file

@ -5,56 +5,136 @@ import "surdeus.su/core/amo"
import "surdeus.su/core/ss/urlenc"
import "surdeus.su/core/cli/mtool"
import "encoding/json"
import "strconv"
import "log"
import "fmt"
import "time"
import "os"
//import "sync"
var getLead = mtool.T("get-leads").Func(func(flags *mtool.Flags){
var (
secretPath string
opts DefaultFlags
)
flags.StringVar(
&secretPath,
"secret",
"",
"path to JSON file with AMO CRM secrets",
"AMO_SECRET",
)
now := time.Now()
MakeDefaultFlags(&opts, flags)
MakeGetterFlags(&opts, flags)
idStrs := flags.Parse()
ids := make([]int, len(idStrs))
for i, idStr := range idStrs {
var err error
ids[i], err = strconv.Atoi(idStr)
if err != nil {
log.Printf("Error: Atoi(%q): %s\n", err)
continue
}
}
c, err := amo.NewClient(secretPath)
c, err := amo.NewClient(opts.SecretPath)
if err != nil {
log.Fatalf("NewAmoClient(...): %s\n", err)
}
c.API.SetMRPS(8)
c.API.SetMRPS(opts.MRPS)
finalLeads := []amo.Lead{}
leads, err := c.GetLeads(
if opts.All {
page := opts.StartPage
leads, next, err := c.GetLeads(
urlenc.Value[int]{"page", page},
urlenc.Value[int]{"limit", opts.MEPR},
)
if err != nil {
log.Fatalf("amo.GetLeads(...): %s\n", err)
}
finalLeads = append(finalLeads, leads...)
if opts.Verbose {
log.Printf("Got %d leads (%d, page %d)\n",
len(leads), len(finalLeads), page)
}
page++
for page <= opts.EndPage && next != nil {
leads, next, err = next()
if err != nil {
log.Fatalf("amo.GetLeads(...): %s\n", err)
}
finalLeads = append(finalLeads, leads...)
if opts.Verbose {
log.Printf("Got %d leads (%d, page %d)\n",
len(leads), len(finalLeads), page)
}
page++
}
bts, err := json.MarshalIndent(finalLeads, "", " ")
if err != nil {
log.Fatalf("json.MarshalIndent(...) %s\n", err)
}
os.Stdout.Write(bts)
return
}
ids := ReadIDs(idStrs, flags)
if len(ids) == 0 {
log.Fatalf("Got no IDs to read leads")
return
}
leadChan := make(chan []amo.Lead)
finish := RunForSliceInThreads[int](
opts.Threads, opts.MEPR,
ids, func(thread int, s []int){
leads, _, err := c.GetLeads(
urlenc.Array[int]{
"id",
ids,
s,
},
urlenc.Value[string]{
"with",
"contacts",
},
urlenc.Value[int]{
"limit",
opts.MEPR,
},
)
if err != nil {
log.Fatalf("GetLeadsByIDs(...): %s\n", err)
log.Printf("GetLeadsByIDs(...): %s\n", err)
}
leadChan <- leads
if opts.Verbose {
log.Printf(
"%d: Got %d leads\n",
thread, len(leads),
)
}
},
)
//var wg sync.WaitGroup
go func(){
// Waiting for appending so we do not lose data.
<-finish
for len(leadChan) > 0 {}
close(leadChan)
}()
for leads := range leadChan {
finalLeads = append(finalLeads, leads...)
}
bts, err := json.MarshalIndent(leads, "", " ")
if opts.Verbose {
rm := c.API.RequestsMade()
log.Printf(
"Summarized got %d leads\n",
len(finalLeads),
)
log.Printf(
"Made %d requests in process\n",
rm,
)
took := time.Since(now).Seconds()
log.Printf(
"Took %f seconds\n",
took,
)
log.Printf("RPS = %f\n", float64(rm)/took)
}
bts, err := json.MarshalIndent(finalLeads, "", " ")
if err != nil {
log.Fatalf("json.Marshal(...): %s\n", err)
}
fmt.Printf("%s\n", bts)
})
}).Usage(
"[id1 id2 ... idN]",
)

View file

@ -1,29 +1,24 @@
package main
import "surdeus.su/core/amo"
import "surdeus.su/core/amo/api"
import "surdeus.su/core/cli/mtool"
import "surdeus.su/core/amo/companies"
import "encoding/json"
import "log"
import "os"
const MEPR = api.MaxEntitiesPerRequest
var updateComs =
mtool.T("update-coms").Func(func(flags *mtool.Flags){
var (
secretPath string
opts DefaultFlags
)
flags.StringVar(
&secretPath,
"secret",
"",
"path to JSON file with AMO CRM secrets",
"AMO_SECRET",
)
MakeDefaultFlags(&opts, flags)
flags.Parse()
client, err := amo.NewClient(secretPath)
client, err := amo.NewClient(opts.SecretPath)
if err != nil {
log.Fatalf("NewAmoClient(...): %s\n", err)
}
@ -35,8 +30,27 @@ mtool.T("update-coms").Func(func(flags *mtool.Flags){
log.Fatalf("json.Decode(...): %s\n", err)
}
err = client.UpdateCompanies(cs)
n := len(cs)/MEPR + 1
for i:=0 ; i<n ; i++ {
start := i * MEPR
end := start + MEPR
if end >= len(cs) {
end = len(cs)
}
if len(cs) == 0 {
continue
}
slice := cs[start:end]
if opts.Verbose {
log.Printf("Updating %d companies...\n", len(slice))
}
err = client.UpdateCompanies(slice)
if err != nil {
log.Fatalf("client.UpdateCompanies(...): %s\n", err)
log.Fatalf(
"client.UpdateCompanies(...) len(...) =%d: %s\n",
len(slice),
err,
)
}
}
})

3
install.sh Executable file
View file

@ -0,0 +1,3 @@
#!/bin/sh
go install ./cmd/amocli/

View file

@ -5,37 +5,46 @@ import "surdeus.su/core/amo/leads"
import "surdeus.su/core/ss/urlenc"
import "errors"
import "fmt"
//import "log"
// Get list of leads.
func (client *Client) GetLeads(
opts ...urlenc.Builder,
) ([]leads.Lead, error) {
) ([]Lead, NextFunc[[]Lead], error) {
res := fmt.Sprintf(
"/api/v4/leads?%s",
urlenc.Join(opts...).Encode(),
)
ret := []leads.Lead{}
return client.GetLeadsByURL(res)
}
for {
var lds leads.Leads
err := client.API.Get(res, &lds)
func (client *Client) GetLeadsByURL(
u string,
) ([]Lead, NextFunc[[]Lead], error) {
var fn NextFunc[[]Lead]
lds := leads.Leads{}
err := client.API.Get(u, &lds)
if err != nil {
// Check for empty.
if errors.Is(err, api.ErrNoContent) {
break
return nil, nil, nil
}
return nil, err
// Some other error.
return nil, nil, err
}
ret = append(ret, lds.Embedded.Leads...)
if lds.Links.Next.Href == "" {
break
nextHref := lds.Links.Next.Href
if nextHref != "" {
fn = MakeNextFunc(
nextHref,
client.GetLeadsByURL,
)
}
res = lds.Links.Next.Href
}
return ret, nil
return lds.Embedded.Leads, fn, nil
}
// Get lead with the specified ID.

View file

@ -88,3 +88,4 @@ type Embedded struct {
Companies []*Companies `json:"companies"`
Contacts Contacts `json:"contacts"`
}

13
next.go Normal file
View file

@ -0,0 +1,13 @@
package amo
type NextFunc[V any] func() (V, NextFunc[V], error)
func MakeNextFunc[V any](
href string,
fn func(string) (V, NextFunc[V], error),
) NextFunc[V] {
return NextFunc[V](func() (V, NextFunc[V], error){
return fn(href)
})
}