DB Log Daten aggregieren (für Postgres)

Begonnen von birdy, 19 Januar 2026, 17:11:44

Vorheriges Thema - Nächstes Thema

birdy

Meine Daten landen alle in einer Postgres DB
Da mir mein Landis & Gyr Zähler via MQTT alle 5 Sekunden Daten liefert, kommen da sehr schnell, sehr viele Daten zusammen. Um dieser Datenflut Herr zu werden, musste ich mir etwas überlegen. Ich habe mir eine Aggregationsroutine gebaut. Möglicherweise gibt es hier jemand der für meinen Code Verwendung hat. Wer möchte kann kopieren, anpassen, eränzen, ganz wie er möchte
Mein Vorgehen. Zuerst speichere ich alle Daten via DBLog in meine PostgresDB. Nach 24 Std lösche ich alle Readings die mich nicht interessieren. Nach 7 Tage Aggregiere ich auf 1 Minute, nach 30 Tage auf 5 Minuten Einheiten usw.

Voraussetzung:
Es befinden sich für den gewünschten Aggregationszeitraum und das zu aggregierende Device nur Daten in der DB, welche sich auch aggregieren lassen. Also numerische Werten aus welchen ein Mittelwert gebildet werden kann.
Anmerkungen:
  • Die vielen unaggregierten Einzelwerte werden in der DB durch die aggregierten Werte ersetzt 
  • Ich aggregiere meine Daten täglich, so muss ich jeweils nur für 1 Tag aggregieren (Laufzeit). Es ist aber jeder beliebige Intervall von n-Tagen möglich.
  • Die Aggregation muss pro Device aufgerufen werden.
  • Alle vorhandenen Readings werden, einzeln pro Reading aggregiert. Viele Readings für ein Device  verlängern somit die Laufzeit.
  • Im Code befinden sich viele Print Statements. Ich empfehle diejenigen welche nicht gebraucht werden zu löschen oder in Kommentar zu setzen.

Vorgehen

Via AT einen Timer definieren um die Aggregations Funktion aufzurufen
defmod at_aggr_MQTT2_AMSreader_2 at *04:00:00 {AgrLog(30,1,5,'MQTT2_AMSreader')}Die Aufrufparameter sind im Code beschrieben.

Die folgende Sub in 99_myUtils.pm einfügen, und in Coce deine Zugangsdaten eintragen

# Datenbankverbindung herstellen
my $dbh = DBI->connect("dbi:Pg:database=postgres;
                        host=localhost",
                  "DEINE_USERID",
                  "DEIN_PASSOWRT")

##########################################################################
#   Agregiere Log
sub AgrLog ($$$$)
{

my ($olderthan,$days,$agregTo,$agrdev) = @_;

# possible values for agregTo 1, 2, 3,  4, 5, 6, 10, 12, 15, 20, 30   
if (($agregTo==1)  or ($agregTo==2)  or  ($agregTo==3)  or ($agregTo==4)  or ($agregTo==5)  or ($agregTo==6) or
    ($agregTo==10) or ($agregTo==12) or ($agregTo==15) or ($agregTo==20) or ($agregTo==30)){
   # Input ok
}
else {
   die "Inpunt Value = $agregTo for agregTo not possible ";
}

my$currentDT =     FmtDateTime(time());
print "\n";
print "----------------- $currentDT  ---------------------- \n";
print "Daten älter als  $olderthan Tage werden aggregiert   \n";
print "Es werden $days zürückliegende Tage aggergiert       \n";
print "Daten werden zu $agregTo Minuten Eiheiten aggregiert \n";
print "Daten für $agrdev werden aggregiert                  \n";

# --- Zeit zusammen stellen
my$currentD  =  DfromDT( FmtDateTime(time()) );
print "currentDT   = $currentDT  \n";
print "currentD    = $currentD   \n";

my ($year, $month, $day) = split("-", $currentD );

#print "year  = $year  \n";
#print "month = $month \n";
#print "day   = $day   \n";

$month--;   # Monate in Time::Local sind 0-basiert (Januar = 0)

my $end_epoch_time = timelocal(0, 0, 0, $day, $month, $year);
   $end_epoch_time = $end_epoch_time - ($olderthan * 86400);
my $end_hmr_time   = FmtDateTime($end_epoch_time) ;          #Human readable

my $start_epoch_time  =  $end_epoch_time - ($days * 86400);
my $start_hmr_time      =  FmtDateTime($start_epoch_time) ;  #Human readable

print "Daten zwischen $start_hmr_time und $end_hmr_time werden aggregiert  \n";
# --- ENDE  Zeit zusammen stellen

if ($agrdev eq 'T' ){
print "kein Device, nur Zeitberechnung\n";
return;
}


# optional Zugangsdaten für DB Verbundung aus File auslesen
#  /opt/fhem/db.conf 
# implementation momentan noch ausstehend

# Datenban Variablen + zischenspeicher
my $sth; 
my $tabExist;

# Datenbankverbindung herstellen
my $dbh = DBI->connect("dbi:Pg:database=postgres;
                        host=localhost",
"DEINE_USERID",
"DEIN_PASSWORT")
  or die "Verbindung fehlgeschlagen: " . DBI->errstr;


#---- Prüfen ob work Tabelle bereis vorhanden, und allenfalls anlegen ------------------------
# SQL-Abfrage erstellen und ausführen
$sth = $dbh->prepare("select exists (select 1 from pg_tables where schemaname ='fhem' and pg_tables.tablename = 'work');")
  or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute()
  or die "SQL execute fehlgeschlagen: " . $sth->errstr;

# Ergebnisse abrufen und ausgeben
while (my @row = $sth->fetchrow_array) {
  $tabExist = $row[0];
}
  if ($tabExist ==1) {
print " Work Tabbelle bereits vorhanden\n";
  }
  else{
print "Work Tabbelle jetzt anlegen\n";

# SQL-Abfrage erstellen und ausführen
    $sth = $dbh->prepare("CREATE TABLE work (
    timestamp timestamp without time zone default CURRENT_TIMESTAMP,
    device character varying(64),
    type character varying(64),
    event character varying(512),
    reading character varying(64),
    value character varying(128),
    unit character varying(32)
    );")
       or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;

$sth->execute()
      or die "SQL execute fehlgeschlagen: " . $sth->errstr;


# SQL-Abfrage erstellen und ausführen
    $sth = $dbh->prepare("ALTER TABLE fhem.work OWNER TO fhem;")
      or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
   
$sth->execute()
      or die "SQL execute fehlgeschlagen: " . $sth->errstr;


    # SQL-Abfrage erstellen und ausführen
    $sth = $dbh->prepare("CREATE INDEX Search_IdxW ON work USING btree (device, reading, timestamp);")
      or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;


$sth->execute()
      or die "SQL execute fehlgeschlagen: " . $sth->errstr;   


print "Tabbelle ist erstellt\n";
 
} # end Tabelle anlegen
#----   end Tabelle anlegen



# zu aggregierende reading für gewünsches device ermitteln
# SQL-Abfrage erstellen und ausführen
$sth = $dbh->prepare("SELECT DISTINCT READING FROM FHEM.HISTORY WHERE DEVICE = ? AND TIMESTAMP BETWEEN ? AND ? ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute($agrdev, $start_hmr_time, $end_hmr_time )
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;
 
print "Number of Readings found for $agrdev = ".$sth->rows ."\n";

my @readingList;

while (my @row = $sth -> fetchrow_array()){
push @readingList, $row[0] ;
}

# zur Kontrolle alle ermittelte Readings ausgeben
foreach my $reading (@readingList){
print " Reading aus readingList = $reading \n";
}


my $loopVar= $start_epoch_time ;
my $loopVar2;
my ($Ins0, $Ins1, $Ins2, $Ins3, $Ins4, $Ins5, $Ins6 );


#Aggregation und insert in Work Tabelle   
do {# Aggregation: loop über Zeit Einheiten
$loopVar2 = $loopVar +($agregTo*60);
#print " aggregiere von ".FmtDateTime($loopVar)." bis ".FmtDateTime($loopVar2)." \n"; 

# pro Zeiteinheit loop über alle Readings
foreach my $reading (@readingList){
   #print "READING: " .$reading . "\n";
   print " aggregiere von ".FmtDateTime($loopVar)." bis ".FmtDateTime($loopVar2). " für   $reading   \n"; 
   #Aggregate SQL
    $sth = $dbh->prepare("INSERT INTO
FHEM.WORK (
TIMESTAMP,
DEVICE,
TYPE,
EVENT,
READING,
VALUE,
UNIT
)
SELECT
MIN(TIMESTAMP) AS TIMESTAMP,
DEVICE,
TYPE,
NULL AS EVENT,
READING,
AVG(CAST(VALUE AS DOUBLE PRECISION)),
NULL AS UNIT
FROM
FHEM.HISTORY
WHERE
DEVICE = ?
AND READING = ?
AND TIMESTAMP BETWEEN ? AND ?
GROUP BY
DEVICE,
TYPE,
READING;")
            or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
        $sth->execute($agrdev, $reading, FmtDateTime($loopVar), FmtDateTime($loopVar2) )
          or die "SQL execute fehlgeschlagen: " . $sth->errstr;
 


    }
$loopVar = $loopVar2;

} while (  $loopVar2 < $end_epoch_time );

#Unaggregierte Daten in Histroy Tabbelle löschen
$currentDT =     FmtDateTime(time()); 
print "----- start Delete Data form  History  Table  --- $currentDT  ------- \n";
$sth = $dbh->prepare("Delete FROM FHEM.HISTORY WHERE DEVICE = ? AND TIMESTAMP BETWEEN ? AND ? ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute($agrdev, $start_hmr_time, $end_hmr_time )
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;
   

# Aggregierte Daten von Work in History Tabelle übertragen
$currentDT =     FmtDateTime(time()); 
print "----- start copy work to History   --- $currentDT  -------------- \n";
$sth = $dbh->prepare("INSERT INTO FHEM.History (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT)
                                  SELECT TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT
                                  FROM FHEM.work ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute()
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;
   
   
# Daten in Work Tabbelle löschen
$currentDT =     FmtDateTime(time()); 
print "----- start Delete Data form  work  Table  --- $currentDT  ------- \n";
$sth = $dbh->prepare("Delete FROM FHEM.work ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute( )
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;


# DB Verbindung schließen
$sth->finish;
$dbh->disconnect;
 
$currentDT =     FmtDateTime(time()); 
print "----------------- $currentDT  ---------------------- \n";

} #END Agr Log ---


##############################################################################
##

Viel Spass beim aggregieren, Bitte denkt daran, ich übernehme keine Verantwortung für Eure Daten Ihr macht das auf eigene Gefahr.

FHEM  @Debian Trixie @Proxmox VE
GMKtec mit AMD Ryzen 7 5700U
CUL 433(a-culfw), CUL 868(SlowRF), Max-Cube CUN geflash, HM-CFG-USB-2 (HMALND)

betateilchen

Das kann doch FHEM mittels DbRep bestimmt von Haus aus?

In der commandref dazu steht:

ZitatCurrently the following functions are provided:

...

- Calculation of quantity of datasets of a Device/Reading-combination within adjustable time limits and several aggregations.
- The calculation of summary-, difference-, maximum-, minimum- and averageValues of numeric readings within adjustable time limits and several aggregations.

...
-----------------------
Formuliere die Aufgabe möglichst einfach und
setze die Lösung richtig um - dann wird es auch funktionieren.
-----------------------
Lesen gefährdet die Unwissenheit!

birdy

Möglicherweise hast du da etwas falsch verstanden.
Es geht nicht darum irgendwelche Summen/Mittelwerte zu bilden.
Es geht darum das Log zu aggregieren bzw. zu verkleinern. Also viele einzelne Einträge aus der DB zu entfernen und durch einen / wenige aggregierte Einträge zu ersetzen. Also reduzieren der Anzahl der Datenpunkte für die Langzeitaufbewarung.
Entsprich von der Idee, dieser Funktion.
  • reduce the amount of datasets in database (reduceLog)
FHEM  @Debian Trixie @Proxmox VE
GMKtec mit AMD Ryzen 7 5700U
CUL 433(a-culfw), CUL 868(SlowRF), Max-Cube CUN geflash, HM-CFG-USB-2 (HMALND)

betateilchen

Zitat von: birdy am 19 Januar 2026, 19:29:51Es geht darum das Log zu aggregieren bzw. zu verkleinern. Also viele einzelne Einträge aus der DB zu entfernen

Auch das geht mit DbRep, ich mache sowas jeden Monatsletzten um 23:59 Uhr, um bestimmte Kurswerte, die ich alle 10 Minuten erhalte, aus dem vorletzten Monat auf einen Mittelwert pro Tag zu reduzieren.

defmod at_reduceLog2 at *{at_ultimo} set dbrep_2 reduceLog average=day

defmod dbrep_2 DbRep logDb2
attr dbrep_2 comment used for reducelog on ultimo
attr dbrep_2 timestamp_begin previous_month_begin
attr dbrep_2 timestamp_end previous_month_end

Anstatt Mittelwert kann man selbstverständlich auch andere Werte verwenden.
-----------------------
Formuliere die Aufgabe möglichst einfach und
setze die Lösung richtig um - dann wird es auch funktionieren.
-----------------------
Lesen gefährdet die Unwissenheit!

birdy

Ja, wenn du nach dem Reduzieren mit einem Wert pro Tag zufrieden bist.
Ich sehe Du hast dieses Problem gar nicht, oder aber nicht verstanden.
Bei einer Vielzahl von Werten macht die Aggregation auf nur einen Wert pro Tag keinen Sinn. Ich selbst kenne keinen Sensoren bei dem es Sinn macht die Daten für die Lanzeitspeicherung auf einen einzigen Wert pro Tag zu reduzieren....??
Bei mir sind es die Powerwerte von Netzbezug, Solar usw.
Die brauche ich nicht über Jahre im 5 Sekundentakt. Aber einen Verlauf über den Tage möchte durchaus sehen.
FHEM  @Debian Trixie @Proxmox VE
GMKtec mit AMD Ryzen 7 5700U
CUL 433(a-culfw), CUL 868(SlowRF), Max-Cube CUN geflash, HM-CFG-USB-2 (HMALND)