diff --git a/.gitignore b/.gitignore index 93a8447..8d669ae 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ secret.json *.json /exe/ +/tmp/ diff --git a/amocrm.go b/amocrm.go index 1127aff..efeb83f 100644 --- a/amocrm.go +++ b/amocrm.go @@ -14,10 +14,7 @@ type Contact = contacts.Contact type Event = events.Event const ( - // Maximum entities for once. - // It is set to be the most effective - // but in fact can be greater. - MaxEnt = 50 + MEPR = api.MaxEntitiesPerRequest ) type Client struct { diff --git a/cmd/amocli/common.go b/cmd/amocli/common.go new file mode 100644 index 0000000..f435560 --- /dev/null +++ b/cmd/amocli/common.go @@ -0,0 +1,194 @@ +package main + +import "surdeus.su/core/cli/mtool" +import "surdeus.su/core/amo" +import "sync" +import "os" +import "strconv" +import "log" +import "bufio" +import "math" + +type DefaultFlags struct { + SecretPath string + Threads int + Verbose bool + MRPS int + MEPR int + + All bool + StartPage int + EndPage int +} + +func MakeDefaultFlags(opts *DefaultFlags, flags *mtool.Flags) { + flags.StringVar( + &opts.SecretPath, + "secret", + "secret.json", + "path to JSON file with AMO CRM secrets", + "AMO_SECRET", + ) + flags.IntVar( + &opts.MRPS, + "mrps", + 9, + "maximum requests per second", + ) + flags.IntVar( + &opts.Threads, + "threads", + 5, + "amount of threads to run the requests", + ) + flags.BoolVar( + &opts.Verbose, + "no-verbose", + true, + "disable verbose mode", + ) + flags.IntVar( + &opts.MEPR, + "mepr", + amo.MEPR, + "max entities per request", + ) +} + +func MakeGetterFlags( + opts *DefaultFlags, + flags *mtool.Flags, +) { + flags.BoolVar( + &opts.All, + "all", + false, + "get all leads", + ) + flags.IntVar( + &opts.StartPage, + "startpage", + 1, + "the page to start at (works only with -all)", + ) + flags.IntVar( + &opts.EndPage, + "endpage", + math.MaxInt, + "the page to end the requests", + ) +} + +// Run function for slice's parts in different threads. +// Returns a channel that ticks that the threads finished. +func RunForSliceInThreads[V any]( + threadN, mepr int, // Thread amount and MERP. + slice []V, + // The function that takes + // the thread number and the slice of slice. + fn func(int, []V), +) (chan struct{}) { + ret := make(chan struct{}) + + // No need for threads if + // there are so few entities. + if len(slice) <= mepr { + go func() { + fn(1, slice) + ret <- struct{}{} + }() + return ret + } + + var wg sync.WaitGroup + + runFn :=func(thread int, s []V) { + defer wg.Done() + iterN := (len(s) / mepr) + 1 + for j := 0 ; j len(s) { + end = len(s) + } + if len(s[start:end]) == 0 { + continue + } + fn(thread, s[start:end]) + } + } + // Maximizing speed on small data. + + + //threadSize := len(slice) / threadN + threadMeprRest := len(slice) % mepr + //threadSize := (len(slice)-threadMeprRest)/threadN + preThreadSize := + ((len(slice)-threadMeprRest)/threadN) + threadSize := (preThreadSize/mepr+1)*mepr + + if threadSize < mepr { + threadSize = mepr + threadN = len(slice) / mepr + runFn = func(thread int, s []V) { + defer wg.Done() + fn(thread, s) + } + } + + for i := 0 ; i len(slice) { + last = len(slice) + } + + // Got an empty slice. + if len(slice[first:last]) == 0 { + break + } + + wg.Add(1) + go runFn(i+1, slice[first:last]) + } + + go func() { + wg.Wait() + ret <- struct{}{} + }() + + return ret +} + +func ReadIDs(idStrs []string, flags *mtool.Flags) []int { + var ids []int + if len(idStrs) > 0 { + ids = make([]int, 0, len(idStrs)) + for _, idStr := range idStrs { + id, err := strconv.Atoi(idStr) + if err != nil { + log.Printf("Error: Atoi(%q): %s\n", err) + continue + } + ids = append(ids, id) + } + } else { + ids = make([]int, 0) + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + txt := scanner.Text() + + id, err := strconv.Atoi(txt) + if err != nil { + log.Printf( + "strconv.Atoi(%q): %s\n", + txt, err, + ) + continue + } + + ids = append(ids, id) + } + } + return ids +} diff --git a/cmd/amocli/getlead.go b/cmd/amocli/getlead.go index b5c9c22..ad72950 100644 --- a/cmd/amocli/getlead.go +++ b/cmd/amocli/getlead.go @@ -5,56 +5,136 @@ import "surdeus.su/core/amo" import "surdeus.su/core/ss/urlenc" import "surdeus.su/core/cli/mtool" import "encoding/json" -import "strconv" import "log" import "fmt" +import "time" +import "os" +//import "sync" var getLead = mtool.T("get-leads").Func(func(flags *mtool.Flags){ var ( - secretPath string + opts DefaultFlags ) - flags.StringVar( - &secretPath, - "secret", - "", - "path to JSON file with AMO CRM secrets", - "AMO_SECRET", - ) + now := time.Now() + MakeDefaultFlags(&opts, flags) + MakeGetterFlags(&opts, flags) + idStrs := flags.Parse() - ids := make([]int, len(idStrs)) - for i, idStr := range idStrs { - var err error - ids[i], err = strconv.Atoi(idStr) - if err != nil { - log.Printf("Error: Atoi(%q): %s\n", err) - continue - } - } - - c, err := amo.NewClient(secretPath) + c, err := amo.NewClient(opts.SecretPath) if err != nil { log.Fatalf("NewAmoClient(...): %s\n", err) } - c.API.SetMRPS(8) + c.API.SetMRPS(opts.MRPS) + finalLeads := []amo.Lead{} - leads, err := c.GetLeads( - urlenc.Array[int]{ - "id", - ids, - }, - urlenc.Value[string]{ - "with", - "contacts", - }, - ) - if err != nil { - log.Fatalf("GetLeadsByIDs(...): %s\n", err) + if opts.All { + page := opts.StartPage + leads, next, err := c.GetLeads( + urlenc.Value[int]{"page", page}, + urlenc.Value[int]{"limit", opts.MEPR}, + ) + if err != nil { + log.Fatalf("amo.GetLeads(...): %s\n", err) + } + finalLeads = append(finalLeads, leads...) + if opts.Verbose { + log.Printf("Got %d leads (%d, page %d)\n", + len(leads), len(finalLeads), page) + } + page++ + for page <= opts.EndPage && next != nil { + leads, next, err = next() + if err != nil { + log.Fatalf("amo.GetLeads(...): %s\n", err) + } + finalLeads = append(finalLeads, leads...) + if opts.Verbose { + log.Printf("Got %d leads (%d, page %d)\n", + len(leads), len(finalLeads), page) + } + page++ + } + + bts, err := json.MarshalIndent(finalLeads, "", " ") + if err != nil { + log.Fatalf("json.MarshalIndent(...) %s\n", err) + } + os.Stdout.Write(bts) + return } - bts, err := json.MarshalIndent(leads, "", " ") + ids := ReadIDs(idStrs, flags) + if len(ids) == 0 { + log.Fatalf("Got no IDs to read leads") + return + } + + leadChan := make(chan []amo.Lead) + finish := RunForSliceInThreads[int]( + opts.Threads, opts.MEPR, + ids, func(thread int, s []int){ + leads, _, err := c.GetLeads( + urlenc.Array[int]{ + "id", + s, + }, + urlenc.Value[string]{ + "with", + "contacts", + }, + urlenc.Value[int]{ + "limit", + opts.MEPR, + }, + ) + if err != nil { + log.Printf("GetLeadsByIDs(...): %s\n", err) + } + leadChan <- leads + if opts.Verbose { + log.Printf( + "%d: Got %d leads\n", + thread, len(leads), + ) + } + }, + ) + + //var wg sync.WaitGroup + go func(){ + // Waiting for appending so we do not lose data. + <-finish + for len(leadChan) > 0 {} + close(leadChan) + }() + + for leads := range leadChan { + finalLeads = append(finalLeads, leads...) + } + + if opts.Verbose { + rm := c.API.RequestsMade() + log.Printf( + "Summarized got %d leads\n", + len(finalLeads), + ) + log.Printf( + "Made %d requests in process\n", + rm, + ) + took := time.Since(now).Seconds() + log.Printf( + "Took %f seconds\n", + took, + ) + log.Printf("RPS = %f\n", float64(rm)/took) + } + bts, err := json.MarshalIndent(finalLeads, "", " ") if err != nil { log.Fatalf("json.Marshal(...): %s\n", err) } fmt.Printf("%s\n", bts) -}) +}).Usage( + "[id1 id2 ... idN]", +) diff --git a/cmd/amocli/updatecom.go b/cmd/amocli/updatecom.go index 8c6d2fd..3c63cf7 100644 --- a/cmd/amocli/updatecom.go +++ b/cmd/amocli/updatecom.go @@ -1,29 +1,24 @@ package main import "surdeus.su/core/amo" +import "surdeus.su/core/amo/api" import "surdeus.su/core/cli/mtool" import "surdeus.su/core/amo/companies" import "encoding/json" import "log" import "os" +const MEPR = api.MaxEntitiesPerRequest + var updateComs = mtool.T("update-coms").Func(func(flags *mtool.Flags){ var ( - secretPath string + opts DefaultFlags ) - - flags.StringVar( - &secretPath, - "secret", - "", - "path to JSON file with AMO CRM secrets", - "AMO_SECRET", - ) - + MakeDefaultFlags(&opts, flags) flags.Parse() - client, err := amo.NewClient(secretPath) + client, err := amo.NewClient(opts.SecretPath) if err != nil { log.Fatalf("NewAmoClient(...): %s\n", err) } @@ -35,8 +30,27 @@ mtool.T("update-coms").Func(func(flags *mtool.Flags){ log.Fatalf("json.Decode(...): %s\n", err) } - err = client.UpdateCompanies(cs) - if err != nil { - log.Fatalf("client.UpdateCompanies(...): %s\n", err) + n := len(cs)/MEPR + 1 + for i:=0 ; i= len(cs) { + end = len(cs) + } + if len(cs) == 0 { + continue + } + slice := cs[start:end] + if opts.Verbose { + log.Printf("Updating %d companies...\n", len(slice)) + } + err = client.UpdateCompanies(slice) + if err != nil { + log.Fatalf( + "client.UpdateCompanies(...) len(...) =%d: %s\n", + len(slice), + err, + ) + } } }) diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..af99f21 --- /dev/null +++ b/install.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +go install ./cmd/amocli/ diff --git a/leads.go b/leads.go index 5346f96..e28a6c3 100644 --- a/leads.go +++ b/leads.go @@ -5,37 +5,46 @@ import "surdeus.su/core/amo/leads" import "surdeus.su/core/ss/urlenc" import "errors" import "fmt" +//import "log" // Get list of leads. func (client *Client) GetLeads( opts ...urlenc.Builder, -) ([]leads.Lead, error) { +) ([]Lead, NextFunc[[]Lead], error) { res := fmt.Sprintf( "/api/v4/leads?%s", urlenc.Join(opts...).Encode(), ) - ret := []leads.Lead{} + return client.GetLeadsByURL(res) +} - for { - var lds leads.Leads - err := client.API.Get(res, &lds) - if err != nil { - if errors.Is(err, api.ErrNoContent) { - break - } - return nil, err +func (client *Client) GetLeadsByURL( + u string, +) ([]Lead, NextFunc[[]Lead], error) { + var fn NextFunc[[]Lead] + + lds := leads.Leads{} + err := client.API.Get(u, &lds) + if err != nil { + // Check for empty. + if errors.Is(err, api.ErrNoContent) { + return nil, nil, nil } - - ret = append(ret, lds.Embedded.Leads...) - - if lds.Links.Next.Href == "" { - break - } - res = lds.Links.Next.Href - + // Some other error. + return nil, nil, err } - return ret, nil + + nextHref := lds.Links.Next.Href + if nextHref != "" { + fn = MakeNextFunc( + nextHref, + client.GetLeadsByURL, + ) + } + + return lds.Embedded.Leads, fn, nil + } // Get lead with the specified ID. diff --git a/leads/leads.go b/leads/leads.go index 8969850..3f5ad48 100644 --- a/leads/leads.go +++ b/leads/leads.go @@ -88,3 +88,4 @@ type Embedded struct { Companies []*Companies `json:"companies"` Contacts Contacts `json:"contacts"` } + diff --git a/next.go b/next.go new file mode 100644 index 0000000..db05ce5 --- /dev/null +++ b/next.go @@ -0,0 +1,13 @@ +package amo + +type NextFunc[V any] func() (V, NextFunc[V], error) + +func MakeNextFunc[V any]( + href string, + fn func(string) (V, NextFunc[V], error), +) NextFunc[V] { + return NextFunc[V](func() (V, NextFunc[V], error){ + return fn(href) + }) +} +