/*
Open an existing Whisper database and read it's header
*/funcOpen(pathstring)(whisper*Whisper,errerror){file,err:=os.OpenFile(path,os.O_RDWR,0666)iferr!=nil{return}deferfunc(){iferr!=nil{whisper=nilfile.Close()}}()whisper=new(Whisper)whisper.file=fileoffset:=0// read the metadata
b:=make([]byte,MetadataSize)readed,err:=file.Read(b)iferr!=nil{err=fmt.Errorf("Unable to read header: %s",err.Error())return}ifreaded!=MetadataSize{err=fmt.Errorf("Unable to read header: EOF")return}a:=unpackInt(b[offset:offset+IntSize])ifa>1024{// support very old format. File starts with lastUpdate and has only average aggregation method
whisper.aggregationMethod=Average}else{whisper.aggregationMethod=AggregationMethod(a)}offset+=IntSizewhisper.maxRetention=unpackInt(b[offset:offset+IntSize])offset+=IntSizewhisper.xFilesFactor=unpackFloat32(b[offset:offset+FloatSize])offset+=FloatSizearchiveCount:=unpackInt(b[offset:offset+IntSize])offset+=IntSize// read the archive info
b=make([]byte,ArchiveInfoSize)whisper.archives=make([]*archiveInfo,0)fori:=0;i<archiveCount;i++{readed,err=file.Read(b)iferr!=nil||readed!=ArchiveInfoSize{err=fmt.Errorf("Unable to read archive %d metadata",i)return}whisper.archives=append(whisper.archives,unpackArchiveInfo(b))}returnwhisper,nil}
/*
Describes a time series in a file.
The only addition this type has over a Retention is the offset at which it exists within the
whisper file.
*/typearchiveInfostruct{Retentionoffsetint}
/*
A retention level.
Retention levels describe a given archive in the database. How detailed it is and how far back
it records.
*/typeRetentionstruct{secondsPerPointintnumberOfPointsint}
func(whisper*Whisper)UpdateMany(points[]*TimeSeriesPoint)(errerror){// recover panics and return as error
deferfunc(){ife:=recover();e!=nil{err=errors.New(e.(string))}}()// sort the points, newest first
reversePoints(points)sort.Stable(timeSeriesPointsNewestFirst{points})now:=int(time.Now().Unix())// TODO: danger of 2030 something overflow
varcurrentPoints[]*TimeSeriesPointfor_,archive:=rangewhisper.archives{currentPoints,points=extractPoints(points,now,archive.MaxRetention())iflen(currentPoints)==0{continue}// reverse currentPoints
reversePoints(currentPoints)err=whisper.archiveUpdateMany(archive,currentPoints)iferr!=nil{return}iflen(points)==0{// nothing left to do
break}}return}
func(whisper*Whisper)archiveUpdateMany(archive*archiveInfo,points[]*TimeSeriesPoint)error{alignedPoints:=alignPoints(archive,points)intervals,packedBlocks:=packSequences(archive,alignedPoints)baseInterval:=whisper.getBaseInterval(archive)ifbaseInterval==0{baseInterval=intervals[0]}fori:=rangeintervals{myOffset:=archive.PointOffset(baseInterval,intervals[i])bytesBeyond:=int(myOffset-archive.End())+len(packedBlocks[i])ifbytesBeyond>0{pos:=len(packedBlocks[i])-bytesBeyonderr:=whisper.fileWriteAt(packedBlocks[i][:pos],myOffset)iferr!=nil{returnerr}err=whisper.fileWriteAt(packedBlocks[i][pos:],archive.Offset())iferr!=nil{returnerr}}else{err:=whisper.fileWriteAt(packedBlocks[i],myOffset)iferr!=nil{returnerr}}}higher:=archivelowerArchives:=whisper.lowerArchives(archive)for_,lower:=rangelowerArchives{seen:=make(map[int]bool)propagateFurther:=falsefor_,point:=rangealignedPoints{interval:=point.interval-mod(point.interval,lower.secondsPerPoint)if!seen[interval]{ifpropagated,err:=whisper.propagate(interval,higher,lower);err!=nil{panic("Failed to propagate")}elseifpropagated{propagateFurther=true}}}if!propagateFurther{break}higher=lower}returnnil}