handler:=rcv.HandleConnectionifrcv.isPickle{handler=rcv.handlePickle}ifrcv.buffer!=nil{originalOut:=rcv.outrcv.Go(func(exitchanbool){for{select{case<-exit:returncasep:=<-rcv.buffer:originalOut(p)}}})rcv.out=func(p*points.Points){rcv.buffer<-p}}rcv.Go(func(exitchanbool){defertcpListener.Close()for{conn,err:=tcpListener.Accept()iferr!=nil{ifstrings.Contains(err.Error(),"use of closed network connection"){break}rcv.logger.Warn("failed to accept connection",zap.Error(err),)continue}rcv.Go(func(exitchanbool){handler(conn)})}})
// OutFunc creates option for New contructor
funcOutFunc(outfunc(*points.Points))Option{returnfunc(rReceiver)error{ift,ok:=r.(*TCP);ok{t.out=out}ift,ok:=r.(*UDP);ok{t.out=out}returnnil}}
// Creates a new cache instance
funcNew()*Cache{c:=&Cache{data:make([]*Shard,shardCount),writeStrategy:Noop,maxSize:1000000,}fori:=0;i<shardCount;i++{c.data[i]=&Shard{items:make(map[string]*points.Points),notConfirmed:make([]*points.Points,4),}}c.writeoutQueue=NewWriteoutQueue(c)returnc}
typeWriteStrategyintconst(MaximumLengthWriteStrategy=iotaTimestampOrderNoop)constshardCount=1024// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (shardCount) map shards.
typeCachestruct{sync.MutexqueueLastBuildtime.Timedata[]*ShardmaxSizeint32writeStrategyWriteStrategywriteoutQueue*WriteoutQueuexlogatomic.Value// io.Writer
statstruct{sizeint32// changing via atomic
queueBuildCntuint32// number of times writeout queue was built
queueBuildTimeMsuint32// time spent building writeout queue in milliseconds
queueWriteoutTimeuint32// in milliseconds
overflowCntuint32// drop packages if cache full
queryCntuint32// number of queries
}}// A "thread" safe string to anything map.
typeShardstruct{sync.RWMutex// Read Write mutex, guards access to internal map.
itemsmap[string]*points.PointsnotConfirmed[]*points.Points// linear search for value/slot
notConfirmedUsedint// search value in notConfirmed[:notConfirmedUsed]
}
// Sets the given value under the specified key.
func(c*Cache)Add(p*points.Points){xlog:=c.xlog.Load()ifxlog!=nil{p.WriteTo(xlog.(io.Writer))}// Get map shard.
count:=len(p.Data)ifc.maxSize>0&&c.Size()>c.maxSize{atomic.AddUint32(&c.stat.overflowCnt,uint32(count))return}shard:=c.GetShard(p.Metric)shard.Lock()ifvalues,exists:=shard.items[p.Metric];exists{values.Data=append(values.Data,p.Data...)}else{shard.items[p.Metric]=p}shard.Unlock()atomic.AddInt32(&c.stat.size,int32(count))}
func(q*WriteoutQueue)get(abortchanbool,popfunc(keystring)(p*points.Points,existsbool))*points.Points{QueueLoop:for{q.RLock()queue:=q.queuerebuild:=q.rebuildq.RUnlock()FetchLoop:for{select{caseqp:=<-queue:// pop from cache
ifp,exists:=pop(qp.Metric);exists{returnp}continueFetchLoopcase<-abort:returnnildefault:// queue is empty, create new
select{case<-rebuild(abort):// wait for rebuild
continueQueueLoopcase<-abort:returnnil}}}}}func(q*WriteoutQueue)Get(abortchanbool)*points.Points{returnq.get(abort,q.cache.Pop)}func(q*WriteoutQueue)GetNotConfirmed(abortchanbool)*points.Points{returnq.get(abort,q.cache.PopNotConfirmed)}
// Removes an element from the map and returns it
func(c*Cache)Pop(keystring)(p*points.Points,existsbool){// Try to get shard.
shard:=c.GetShard(key)shard.Lock()p,exists=shard.items[key]delete(shard.items,key)shard.Unlock()ifexists{atomic.AddInt32(&c.stat.size,-int32(len(p.Data)))}returnp,exists}func(c*Cache)PopNotConfirmed(keystring)(p*points.Points,existsbool){// Try to get shard.
shard:=c.GetShard(key)shard.Lock()p,exists=shard.items[key]delete(shard.items,key)ifexists{ifshard.notConfirmedUsed<len(shard.notConfirmed){shard.notConfirmed[shard.notConfirmedUsed]=p}else{shard.notConfirmed=append(shard.notConfirmed,p)}shard.notConfirmedUsed++}shard.Unlock()ifexists{atomic.AddInt32(&c.stat.size,-int32(len(p.Data)))}returnp,exists}
funcstore(p*Whisper,values*points.Points){// avoid concurrent store same metric
// @TODO: may be flock?
// start := time.Now()
mutexIndex:=fnv32(values.Metric)%storeMutexCountp.storeMutex[mutexIndex].Lock()// atomic.AddUint64(&p.blockAvoidConcurrentNs, uint64(time.Since(start).Nanoseconds()))
deferp.storeMutex[mutexIndex].Unlock()path:=filepath.Join(p.rootPath,strings.Replace(values.Metric,".","/",-1)+".wsp")w,err:=whisper.Open(path)iferr!=nil{// create new whisper if file not exists
if!os.IsNotExist(err){p.logger.Error("failed to open whisper file",zap.String("path",path),zap.Error(err))return}schema,ok:=p.schemas.Match(values.Metric)if!ok{p.logger.Error("no storage schema defined for metric",zap.String("metric",values.Metric))return}aggr:=p.aggregation.match(values.Metric)ifaggr==nil{p.logger.Error("no storage aggregation defined for metric",zap.String("metric",values.Metric))return}iferr=os.MkdirAll(filepath.Dir(path),os.ModeDir|os.ModePerm);err!=nil{p.logger.Error("mkdir failed",zap.String("dir",filepath.Dir(path)),zap.Error(err),zap.String("path",path),)return}w,err=whisper.CreateWithOptions(path,schema.Retentions,aggr.aggregationMethod,float32(aggr.xFilesFactor),&whisper.Options{Sparse:p.sparse,})iferr!=nil{p.logger.Error("create new whisper file failed",zap.String("path",path),zap.Error(err),zap.String("retention",schema.RetentionStr),zap.String("schema",schema.Name),zap.String("aggregation",aggr.name),zap.Float64("xFilesFactor",aggr.xFilesFactor),zap.String("method",aggr.aggregationMethodStr),)return}p.createLogger.Debug("created",zap.String("path",path),zap.String("retention",schema.RetentionStr),zap.String("schema",schema.Name),zap.String("aggregation",aggr.name),zap.Float64("xFilesFactor",aggr.xFilesFactor),zap.String("method",aggr.aggregationMethodStr),)atomic.AddUint32(&p.created,1)}points:=make([]*whisper.TimeSeriesPoint,len(values.Data))fori,r:=rangevalues.Data{points[i]=&whisper.TimeSeriesPoint{Time:int(r.Timestamp),Value:r.Value}}atomic.AddUint32(&p.committedPoints,uint32(len(values.Data)))atomic.AddUint32(&p.updateOperations,1)deferw.Close()deferfunc(){ifr:=recover();r!=nil{p.logger.Error("UpdateMany panic recovered",zap.String("path",path),zap.String("traceback",fmt.Sprint(r)),)}}()// start = time.Now()
w.UpdateMany(points)// atomic.AddUint64(&p.blockUpdateManyNs, uint64(time.Since(start).Nanoseconds()))
}