Cache Netatmo data in memory

This commit is contained in:
Robert Jacob 2020-06-27 17:46:02 +02:00
parent 104816fd0c
commit 9416c8b92f
2 changed files with 72 additions and 19 deletions

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"sync"
"time" "time"
netatmo "github.com/exzz/netatmo-api-go" netatmo "github.com/exzz/netatmo-api-go"
@ -11,7 +12,18 @@ import (
var ( var (
prefix = "netatmo_" prefix = "netatmo_"
netatmoUpDesc = prometheus.NewDesc(prefix+"up", netatmoUpDesc = prometheus.NewDesc(prefix+"up",
"Zero if there was an error scraping the Netatmo API.", "Zero if there was an error during the last refresh try.",
nil, nil)
refreshPrefix = prefix + "last_refresh"
refreshTimestampDesc = prometheus.NewDesc(
refreshPrefix+"_time",
"Contains the time of the last refresh try, successful or not.",
nil, nil)
cacheTimestampDesc = prometheus.NewDesc(
prefix+"cache_updated_time",
"Contains the time of the cached data.",
nil, nil) nil, nil)
varLabels = []string{ varLabels = []string{
@ -94,8 +106,14 @@ var (
type netatmoCollector struct { type netatmoCollector struct {
log logrus.FieldLogger log logrus.FieldLogger
refreshInterval time.Duration
staleThreshold time.Duration staleThreshold time.Duration
client *netatmo.Client client *netatmo.Client
lastRefresh time.Time
lastRefreshError error
cacheLock sync.RWMutex
cacheTimestamp time.Time
cachedData *netatmo.DeviceCollection
} }
func (c *netatmoCollector) Describe(dChan chan<- *prometheus.Desc) { func (c *netatmoCollector) Describe(dChan chan<- *prometheus.Desc) {
@ -106,16 +124,24 @@ func (c *netatmoCollector) Describe(dChan chan<- *prometheus.Desc) {
} }
func (c *netatmoCollector) Collect(mChan chan<- prometheus.Metric) { func (c *netatmoCollector) Collect(mChan chan<- prometheus.Metric) {
devices, err := c.client.Read() now := time.Now()
if err != nil { if now.Sub(c.lastRefresh) >= c.refreshInterval {
c.log.Errorf("Error getting data: %s", err) go c.refreshData(now)
c.sendMetric(mChan, netatmoUpDesc, prometheus.GaugeValue, 0.0)
return
} }
c.sendMetric(mChan, netatmoUpDesc, prometheus.GaugeValue, 1.0)
for _, dev := range devices.Devices() { upValue := 1.0
if c.lastRefresh.IsZero() || c.lastRefreshError != nil {
upValue = 0
}
c.sendMetric(mChan, netatmoUpDesc, prometheus.GaugeValue, upValue)
c.sendMetric(mChan, refreshTimestampDesc, prometheus.GaugeValue, convertTime(c.lastRefresh))
c.cacheLock.RLock()
defer c.cacheLock.RUnlock()
c.sendMetric(mChan, cacheTimestampDesc, prometheus.GaugeValue, convertTime(c.cacheTimestamp))
if c.cachedData != nil {
for _, dev := range c.cachedData.Devices() {
stationName := dev.StationName stationName := dev.StationName
c.collectData(mChan, dev, stationName) c.collectData(mChan, dev, stationName)
@ -124,6 +150,24 @@ func (c *netatmoCollector) Collect(mChan chan<- prometheus.Metric) {
} }
} }
} }
}
func (c *netatmoCollector) refreshData(now time.Time) {
c.log.Debugf("Refresh interval elapsed: %s > %s", now.Sub(c.lastRefresh), c.refreshInterval)
c.lastRefresh = now
devices, err := c.client.Read()
if err != nil {
c.log.Errorf("Error during refresh: %s", err)
c.lastRefreshError = err
return
}
c.cacheLock.Lock()
defer c.cacheLock.Unlock()
c.cacheTimestamp = now
c.cachedData = devices
}
func (c *netatmoCollector) collectData(ch chan<- prometheus.Metric, device *netatmo.Device, stationName string) { func (c *netatmoCollector) collectData(ch chan<- prometheus.Metric, device *netatmo.Device, stationName string) {
moduleName := device.ModuleName moduleName := device.ModuleName
@ -193,3 +237,11 @@ func (c *netatmoCollector) sendMetric(ch chan<- prometheus.Metric, desc *prometh
} }
ch <- m ch <- m
} }
func convertTime(t time.Time) float64 {
if t.IsZero() {
return 0.0
}
return float64(t.Unix())
}

View file

@ -38,6 +38,7 @@ func main() {
metrics := &netatmoCollector{ metrics := &netatmoCollector{
log: log, log: log,
client: client, client: client,
refreshInterval: cfg.RefreshInterval,
staleThreshold: cfg.StaleDuration, staleThreshold: cfg.StaleDuration,
} }
prometheus.MustRegister(metrics) prometheus.MustRegister(metrics)