package memcached

import (
	"encoding/binary"
	"fmt"
	"github.com/couchbase/gomemcached"
	"math"
)

type SystemEventType int

const InvalidSysEvent SystemEventType = -1

const (
	CollectionCreate  SystemEventType = 0
	CollectionDrop    SystemEventType = iota
	CollectionFlush   SystemEventType = iota // KV did not implement
	ScopeCreate       SystemEventType = iota
	ScopeDrop         SystemEventType = iota
	CollectionChanged SystemEventType = iota
)

type ScopeCreateEvent interface {
	GetSystemEventName() (string, error)
	GetScopeId() (uint32, error)
	GetManifestId() (uint64, error)
}

type CollectionCreateEvent interface {
	GetSystemEventName() (string, error)
	GetScopeId() (uint32, error)
	GetCollectionId() (uint32, error)
	GetManifestId() (uint64, error)
	GetMaxTTL() (uint32, error)
}

type CollectionDropEvent interface {
	GetScopeId() (uint32, error)
	GetCollectionId() (uint32, error)
	GetManifestId() (uint64, error)
}

type ScopeDropEvent interface {
	GetScopeId() (uint32, error)
	GetManifestId() (uint64, error)
}

type CollectionChangedEvent interface {
	GetCollectionId() (uint32, error)
	GetManifestId() (uint64, error)
	GetMaxTTL() (uint32, error)
}

var ErrorInvalidOp error = fmt.Errorf("Invalid Operation")
var ErrorInvalidVersion error = fmt.Errorf("Invalid version for parsing")
var ErrorValueTooShort error = fmt.Errorf("Value length is too short")
var ErrorNoMaxTTL error = fmt.Errorf("This event has no max TTL")

// UprEvent memcached events for UPR streams.
type UprEvent struct {
	Opcode          gomemcached.CommandCode // Type of event
	Status          gomemcached.Status      // Response status
	VBucket         uint16                  // VBucket this event applies to
	DataType        uint8                   // data type
	Opaque          uint16                  // 16 MSB of opaque
	VBuuid          uint64                  // This field is set by downstream
	Flags           uint32                  // Item flags
	Expiry          uint32                  // Item expiration time
	Key, Value      []byte                  // Item key/value
	OldValue        []byte                  // TODO: TBD: old document value
	Cas             uint64                  // CAS value of the item
	Seqno           uint64                  // sequence number of the mutation
	RevSeqno        uint64                  // rev sequence number : deletions
	LockTime        uint32                  // Lock time
	MetadataSize    uint16                  // Metadata size
	SnapstartSeq    uint64                  // start sequence number of this snapshot
	SnapendSeq      uint64                  // End sequence number of the snapshot
	SnapshotType    uint32                  // 0: disk 1: memory
	FailoverLog     *FailoverLog            // Failover log containing vvuid and sequnce number
	Error           error                   // Error value in case of a failure
	ExtMeta         []byte                  // Extended Metadata
	AckSize         uint32                  // The number of bytes that can be Acked to DCP
	SystemEvent     SystemEventType         // Only valid if IsSystemEvent() is true
	SysEventVersion uint8                   // Based on the version, the way Extra bytes is parsed is different
	ValueLen        int                     // Cache it to avoid len() calls for performance
	CollectionId    uint32                  // Valid if Collection is in use
	StreamId        *uint16                 // Nil if not in use
}

// FailoverLog containing vvuid and sequnce number
type FailoverLog [][2]uint64

// Containing a pair of vbno and the high seqno
type VBSeqnos [][2]uint64

func makeUprEvent(rq gomemcached.MCRequest, stream *UprStream, bytesReceivedFromDCP int) *UprEvent {
	event := &UprEvent{
		Opcode:       rq.Opcode,
		VBucket:      stream.Vbucket,
		VBuuid:       stream.Vbuuid,
		Value:        rq.Body,
		Cas:          rq.Cas,
		ExtMeta:      rq.ExtMeta,
		DataType:     rq.DataType,
		ValueLen:     len(rq.Body),
		SystemEvent:  InvalidSysEvent,
		CollectionId: math.MaxUint32,
	}

	event.PopulateFieldsBasedOnStreamType(rq, stream.StreamType)

	// set AckSize for events that need to be acked to DCP,
	// i.e., events with CommandCodes that need to be buffered in DCP
	if _, ok := gomemcached.BufferedCommandCodeMap[rq.Opcode]; ok {
		event.AckSize = uint32(bytesReceivedFromDCP)
	}

	// 16 LSBits are used by client library to encode vbucket number.
	// 16 MSBits are left for application to multiplex on opaque value.
	event.Opaque = appOpaque(rq.Opaque)

	if len(rq.Extras) >= uprMutationExtraLen &&
		event.Opcode == gomemcached.UPR_MUTATION {

		event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
		event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
		event.Flags = binary.BigEndian.Uint32(rq.Extras[16:20])
		event.Expiry = binary.BigEndian.Uint32(rq.Extras[20:24])
		event.LockTime = binary.BigEndian.Uint32(rq.Extras[24:28])
		event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[28:30])

	} else if len(rq.Extras) >= uprDeletetionWithDeletionTimeExtraLen &&
		event.Opcode == gomemcached.UPR_DELETION {

		event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
		event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
		event.Expiry = binary.BigEndian.Uint32(rq.Extras[16:20])

	} else if len(rq.Extras) >= uprDeletetionExtraLen &&
		event.Opcode == gomemcached.UPR_DELETION ||
		event.Opcode == gomemcached.UPR_EXPIRATION {

		event.Seqno = binary.BigEndian.Uint64(rq.Extras[:8])
		event.RevSeqno = binary.BigEndian.Uint64(rq.Extras[8:16])
		event.MetadataSize = binary.BigEndian.Uint16(rq.Extras[16:18])

	} else if len(rq.Extras) >= uprSnapshotExtraLen &&
		event.Opcode == gomemcached.UPR_SNAPSHOT {

		event.SnapstartSeq = binary.BigEndian.Uint64(rq.Extras[:8])
		event.SnapendSeq = binary.BigEndian.Uint64(rq.Extras[8:16])
		event.SnapshotType = binary.BigEndian.Uint32(rq.Extras[16:20])
	} else if event.IsSystemEvent() {
		event.PopulateEvent(rq.Extras)
	} else if event.IsSeqnoAdv() {
		event.PopulateSeqnoAdv(rq.Extras)
	} else if event.IsOsoSnapshot() {
		event.PopulateOso(rq.Extras)
	}

	return event
}

func (event *UprEvent) PopulateFieldsBasedOnStreamType(rq gomemcached.MCRequest, streamType DcpStreamType) {
	switch streamType {
	case CollectionsStreamId:
		for _, extra := range rq.FramingExtras {
			streamId, streamIdErr := extra.GetStreamId()
			if streamIdErr == nil {
				event.StreamId = &streamId
			}
		}
		// After parsing streamID, still need to populate regular collectionID
		fallthrough
	case CollectionsNonStreamId:
		switch rq.Opcode {
		// Only these will have CID encoded within the key
		case gomemcached.UPR_MUTATION,
			gomemcached.UPR_DELETION,
			gomemcached.UPR_EXPIRATION:
			uleb128 := Uleb128(rq.Key)
			result, bytesShifted := uleb128.ToUint32(rq.Keylen)
			event.CollectionId = result
			event.Key = rq.Key[bytesShifted:]
		default:
			event.Key = rq.Key
		}
	case NonCollectionStream:
		// Let default behavior be legacy stream type
		fallthrough
	default:
		event.Key = rq.Key
	}
}

func (event *UprEvent) String() string {
	name := gomemcached.CommandNames[event.Opcode]
	if name == "" {
		name = fmt.Sprintf("#%d", event.Opcode)
	}
	return name
}

func (event *UprEvent) IsSnappyDataType() bool {
	return event.Opcode == gomemcached.UPR_MUTATION && (event.DataType&SnappyDataType > 0)
}

func (event *UprEvent) IsCollectionType() bool {
	return event.IsSystemEvent() || event.CollectionId <= math.MaxUint32
}

func (event *UprEvent) IsSystemEvent() bool {
	return event.Opcode == gomemcached.DCP_SYSTEM_EVENT
}

func (event *UprEvent) IsSeqnoAdv() bool {
	return event.Opcode == gomemcached.DCP_SEQNO_ADV
}

func (event *UprEvent) IsOsoSnapshot() bool {
	return event.Opcode == gomemcached.DCP_OSO_SNAPSHOT
}

func (event *UprEvent) PopulateEvent(extras []byte) {
	if len(extras) < dcpSystemEventExtraLen {
		// Wrong length, don't parse
		return
	}

	event.Seqno = binary.BigEndian.Uint64(extras[:8])
	event.SystemEvent = SystemEventType(binary.BigEndian.Uint32(extras[8:12]))
	var versionTemp uint16 = binary.BigEndian.Uint16(extras[12:14])
	event.SysEventVersion = uint8(versionTemp >> 8)
}

func (event *UprEvent) PopulateSeqnoAdv(extras []byte) {
	if len(extras) < dcpSeqnoAdvExtraLen {
		// Wrong length, don't parse
		return
	}

	event.Seqno = binary.BigEndian.Uint64(extras[:8])
}

func (event *UprEvent) PopulateOso(extras []byte) {
	if len(extras) < dcpOsoExtraLen {
		// Wrong length, don't parse
		return
	}
	event.Flags = binary.BigEndian.Uint32(extras[:4])
}

func (event *UprEvent) GetSystemEventName() (string, error) {
	switch event.SystemEvent {
	case CollectionCreate:
		fallthrough
	case ScopeCreate:
		return string(event.Key), nil
	default:
		return "", ErrorInvalidOp
	}
}

func (event *UprEvent) GetManifestId() (uint64, error) {
	switch event.SystemEvent {
	// Version 0 only checks
	case CollectionChanged:
		fallthrough
	case ScopeDrop:
		fallthrough
	case ScopeCreate:
		fallthrough
	case CollectionDrop:
		if event.SysEventVersion > 0 {
			return 0, ErrorInvalidVersion
		}
		fallthrough
	case CollectionCreate:
		// CollectionCreate supports version 1
		if event.SysEventVersion > 1 {
			return 0, ErrorInvalidVersion
		}
		if event.ValueLen < 8 {
			return 0, ErrorValueTooShort
		}
		return binary.BigEndian.Uint64(event.Value[0:8]), nil
	default:
		return 0, ErrorInvalidOp
	}
}

func (event *UprEvent) GetCollectionId() (uint32, error) {
	switch event.SystemEvent {
	case CollectionDrop:
		if event.SysEventVersion > 0 {
			return 0, ErrorInvalidVersion
		}
		fallthrough
	case CollectionCreate:
		if event.SysEventVersion > 1 {
			return 0, ErrorInvalidVersion
		}
		if event.ValueLen < 16 {
			return 0, ErrorValueTooShort
		}
		return binary.BigEndian.Uint32(event.Value[12:16]), nil
	case CollectionChanged:
		if event.SysEventVersion > 0 {
			return 0, ErrorInvalidVersion
		}
		if event.ValueLen < 12 {
			return 0, ErrorValueTooShort
		}
		return binary.BigEndian.Uint32(event.Value[8:12]), nil
	default:
		return 0, ErrorInvalidOp
	}
}

func (event *UprEvent) GetScopeId() (uint32, error) {
	switch event.SystemEvent {
	// version 0 checks
	case ScopeCreate:
		fallthrough
	case ScopeDrop:
		fallthrough
	case CollectionDrop:
		if event.SysEventVersion > 0 {
			return 0, ErrorInvalidVersion
		}
		fallthrough
	case CollectionCreate:
		// CollectionCreate could be either 0 or 1
		if event.SysEventVersion > 1 {
			return 0, ErrorInvalidVersion
		}
		if event.ValueLen < 12 {
			return 0, ErrorValueTooShort
		}
		return binary.BigEndian.Uint32(event.Value[8:12]), nil
	default:
		return 0, ErrorInvalidOp
	}
}

func (event *UprEvent) GetMaxTTL() (uint32, error) {
	switch event.SystemEvent {
	case CollectionCreate:
		if event.SysEventVersion < 1 {
			return 0, ErrorNoMaxTTL
		}
		if event.ValueLen < 20 {
			return 0, ErrorValueTooShort
		}
		return binary.BigEndian.Uint32(event.Value[16:20]), nil
	case CollectionChanged:
		if event.SysEventVersion > 0 {
			return 0, ErrorInvalidVersion
		}
		if event.ValueLen < 16 {
			return 0, ErrorValueTooShort
		}
		return binary.BigEndian.Uint32(event.Value[12:16]), nil
	default:
		return 0, ErrorInvalidOp
	}
}

// Only if error is nil:
// Returns true if event states oso begins
// Return false if event states oso ends
func (event *UprEvent) GetOsoBegin() (bool, error) {
	if !event.IsOsoSnapshot() {
		return false, ErrorInvalidOp
	}

	if event.Flags == 1 {
		return true, nil
	} else if event.Flags == 2 {
		return false, nil
	} else {
		return false, ErrorInvalidOp
	}
}

type Uleb128 []byte

func (u Uleb128) ToUint32(cachedLen int) (result uint32, bytesShifted int) {
	var shift uint = 0

	for curByte := 0; curByte < cachedLen; curByte++ {
		oneByte := u[curByte]
		last7Bits := 0x7f & oneByte
		result |= uint32(last7Bits) << shift
		bytesShifted++
		if oneByte&0x80 == 0 {
			break
		}
		shift += 7
	}

	return
}