package bstore import ( "fmt" "reflect" ) // isZero returns whether v is the zero value for the fields that we store. // reflect.IsZero cannot be used on structs because it checks private fields as well. func (ft fieldType) isZero(v reflect.Value) bool { if !v.IsValid() { return true } if ft.Ptr { return v.IsZero() } switch ft.Kind { case kindStruct: for _, f := range ft.structFields { if !f.Type.isZero(v.FieldByIndex(f.structField.Index)) { return false } } return true } // Use standard IsZero otherwise, also for kindBinaryMarshal. return v.IsZero() } // We ensure nonzero constraints when opening a database. An updated schema, with // added nonzero constraints, can mean all records have to be checked. With cyclic // types, we have to take care not to recurse, and for efficiency we want to only // check fields/types that are affected. Steps: // // - Go through each field of the struct, and recurse into the field types, // gathering the types and newly nonzero fields. // - Propagate the need for nonzero checks to types that reference the changed // types. // - By now, if there was a new nonzero constraint, the top-level type will be // marked as needing a check, so we'll read through all records and check all the // immediate newly nonzero fields of a type, and recurse into fields of types that // are marked as needing a check. // nonzeroCheckType is tracked per reflect.Type that has been analysed (always the // non-pointer type, i.e. a pointer is dereferenced). These types can be cyclic. We // gather them for all types involved, including map and slice types and basic // types, but "newlyNonzero" and "fields" will only be set for structs. type nonzeroCheckType struct { needsCheck bool newlyNonzero []field // Fields in this type that have a new nonzero constraint themselves. fields []field // All fields in a struct type. // Types that reference this type. Used to propagate needsCheck to the top. referencedBy map[reflect.Type]struct{} } func (ct *nonzeroCheckType) markRefBy(t reflect.Type) { if t != nil { ct.referencedBy[t] = struct{}{} } } // checkNonzero compares ofields (optional previous type schema) and nfields (new // type schema) for nonzero struct tags. If an existing field has a new nonzero // constraint, we verify that there are indeed no nonzero values in the existing // records. If there are, we return ErrZero. checkNonzero looks at (potentially // cyclic) types referenced by fields. func (tx *Tx) checkNonzero(st storeType, tv *typeVersion, ofields, nfields []field) error { // Gather all new nonzero constraints on fields. m := map[reflect.Type]*nonzeroCheckType{} nonzeroCheckGather(m, st.Type, nil, ofields, nfields) // Propagate the need for a check on all types due to a referenced type having a // new nonzero constraint. // todo: this can probably be done more elegantly, with fewer graph walks... for t, ct := range m { if ct.needsCheck { nonzeroCheckPropagate(m, t, t, ct) } } // If needsCheck wasn't propagated to the top-level, there was no new nonzero // constraint, and we're not going to read all the data. This is the common case // when opening a database. if !m[st.Type].needsCheck { return nil } // Read through all data, and check the new nonzero constraint. // todo optimize: if there are only top-level fields to check, and we have indices on those fields, we can use the index to check this without reading all data. return checkNonzeroRecords(tx, st, tv, m) } // Walk down fields, gathering their types (including those they reference), and // marking needsCheck if any of a type's immediate field has a new nonzero // constraint. The need for a check is not propagated to referencing types by this // function. func nonzeroCheckGather(m map[reflect.Type]*nonzeroCheckType, t, refBy reflect.Type, ofields, nfields []field) { ct := m[t] if ct != nil { // Already gathered, don't recurse, for cyclic types. ct.markRefBy(refBy) return } ct = &nonzeroCheckType{ fields: nfields, referencedBy: map[reflect.Type]struct{}{}, } ct.markRefBy(refBy) m[t] = ct for _, f := range nfields { // Check if this field is newly nonzero. var of *field for i := range ofields { if f.Name == ofields[i].Name { of = &ofields[i] // Compare with existing field. if f.Nonzero && !of.Nonzero { ct.newlyNonzero = append(ct.newlyNonzero, f) ct.needsCheck = true } break } } // Check if this is a new field entirely, with nonzero constraint. if of == nil && f.Nonzero { ct.newlyNonzero = append(ct.newlyNonzero, f) ct.needsCheck = true } // Descend into referenced types, adding references back to this type. var oft *fieldType if of != nil { oft = &of.Type } ft := f.structField.Type nonzeroCheckGatherFieldType(m, ft, t, oft, f.Type) } } // gather new nonzero constraints for type "t", which is referenced by "refBy" (and // will be marked as such). type "t" is described by "nft" and optionally // previously by "oft". func nonzeroCheckGatherFieldType(m map[reflect.Type]*nonzeroCheckType, t, refBy reflect.Type, oft *fieldType, nft fieldType) { // If this is a pointer type, dereference the reflect type. if nft.Ptr { t = t.Elem() } if nft.Kind == kindStruct { var fofields []field if oft != nil { fofields = oft.structFields } nonzeroCheckGather(m, t, refBy, fofields, nft.structFields) } // Mark this type as gathered, so we don't process it again if we recurse. ct := m[t] if ct != nil { ct.markRefBy(refBy) return } ct = &nonzeroCheckType{ fields: nft.structFields, referencedBy: map[reflect.Type]struct{}{}, } ct.markRefBy(refBy) m[t] = ct switch nft.Kind { case kindMap: var koft, voft *fieldType if oft != nil { koft = oft.MapKey voft = oft.MapValue } nonzeroCheckGatherFieldType(m, t.Key(), t, koft, *nft.MapKey) nonzeroCheckGatherFieldType(m, t.Elem(), t, voft, *nft.MapValue) case kindSlice: var loft *fieldType if oft != nil { loft = oft.ListElem } nonzeroCheckGatherFieldType(m, t.Elem(), t, loft, *nft.ListElem) case kindArray: var loft *fieldType if oft != nil { loft = oft.ListElem } nonzeroCheckGatherFieldType(m, t.Elem(), t, loft, *nft.ListElem) } } // Propagate that type "t" is affected by a new nonzero constrained and needs to be // checked. The types referencing "t" are in ct.referencedBy. "origt" is the // starting type for this propagation. func nonzeroCheckPropagate(m map[reflect.Type]*nonzeroCheckType, origt, t reflect.Type, ct *nonzeroCheckType) { for rt := range ct.referencedBy { if rt == origt { continue // End recursion. } m[rt].needsCheck = true nonzeroCheckPropagate(m, origt, rt, m[rt]) } } // checkNonzeroPaths reads through all records of a type, and checks that the fields // indicated by paths are nonzero. If not, ErrZero is returned. func checkNonzeroRecords(tx *Tx, st storeType, tv *typeVersion, m map[reflect.Type]*nonzeroCheckType) error { rb, err := tx.recordsBucket(st.Current.name, st.Current.fillPercent) if err != nil { return err } ctxDone := tx.ctx.Done() return rb.ForEach(func(bk, bv []byte) error { tx.stats.Records.Cursor++ select { case <-ctxDone: return tx.ctx.Err() default: } // todo optimize: instead of parsing the full record, use the fieldmap to see if the value is nonzero. rv, err := st.parseNew(bk, bv) if err != nil { return err } ct := m[st.Type] return checkNonzeroFields(m, st.Type, ct.newlyNonzero, ct.fields, rv) }) } // checkNonzeroFields checks that the newly nonzero fields of a struct value are // indeed nonzero, and walks down referenced types, checking the constraint. func checkNonzeroFields(m map[reflect.Type]*nonzeroCheckType, t reflect.Type, newlyNonzero, fields []field, rv reflect.Value) error { // Check the newly nonzero fields. for _, f := range newlyNonzero { frv := rv.FieldByIndex(f.structField.Index) if f.Type.isZero(frv) { return fmt.Errorf("%w: field %q", ErrZero, f.Name) } } // Descend into referenced types. for _, f := range fields { switch f.Type.Kind { case kindMap, kindSlice, kindStruct, kindArray: ft := f.structField.Type if err := checkNonzeroFieldType(m, f.Type, ft, rv.FieldByIndex(f.structField.Index)); err != nil { return err } } } return nil } // checkNonzeroFieldType walks down a value, and checks that its (struct) types // don't violate nonzero constraints. // Does not check whether the value itself is nonzero. If required, that has // already been checked. func checkNonzeroFieldType(m map[reflect.Type]*nonzeroCheckType, ft fieldType, t reflect.Type, rv reflect.Value) error { if ft.Ptr { t = t.Elem() } if !m[t].needsCheck { return nil } if ft.Ptr && rv.IsZero() { return nil } if ft.Ptr { rv = rv.Elem() } unptr := func(t reflect.Type, ptr bool) reflect.Type { if ptr { return t.Elem() } return t } switch ft.Kind { case kindMap: kt := t.Key() vt := t.Elem() checkKey := m[unptr(kt, ft.MapKey.Ptr)].needsCheck checkValue := m[unptr(vt, ft.MapValue.Ptr)].needsCheck iter := rv.MapRange() for iter.Next() { if checkKey { if err := checkNonzeroFieldType(m, *ft.MapKey, kt, iter.Key()); err != nil { return err } } if checkValue { if err := checkNonzeroFieldType(m, *ft.MapValue, vt, iter.Value()); err != nil { return err } } } case kindSlice: et := t.Elem() n := rv.Len() for i := 0; i < n; i++ { if err := checkNonzeroFieldType(m, *ft.ListElem, et, rv.Index(i)); err != nil { return err } } case kindArray: et := t.Elem() n := ft.ArrayLength for i := 0; i < n; i++ { if err := checkNonzeroFieldType(m, *ft.ListElem, et, rv.Index(i)); err != nil { return err } } case kindStruct: ct := m[t] if err := checkNonzeroFields(m, t, ct.newlyNonzero, ct.fields, rv); err != nil { return err } } return nil }