FHEM Forum

FHEM => Codeschnipsel => Thema gestartet von: birdy am 19 Januar 2026, 17:11:44

Titel: DB Log Daten aggregieren (für Postgres)
Beitrag von: birdy am 19 Januar 2026, 17:11:44
Meine Daten landen alle in einer Postgres DB
Da mir mein Landis & Gyr Zähler via MQTT alle 5 Sekunden Daten liefert, kommen da sehr schnell, sehr viele Daten zusammen. Um dieser Datenflut Herr zu werden, musste ich mir etwas überlegen. Ich habe mir eine Aggregationsroutine gebaut. Möglicherweise gibt es hier jemand der für meinen Code Verwendung hat. Wer möchte kann kopieren, anpassen, eränzen, ganz wie er möchte
Mein Vorgehen. Zuerst speichere ich alle Daten via DBLog in meine PostgresDB. Nach 24 Std lösche ich alle Readings die mich nicht interessieren. Nach 7 Tage Aggregiere ich auf 1 Minute, nach 30 Tage auf 5 Minuten Einheiten usw.

Voraussetzung:
Es befinden sich für den gewünschten Aggregationszeitraum und das zu aggregierende Device nur Daten in der DB, welche sich auch aggregieren lassen. Also numerische Werten aus welchen ein Mittelwert gebildet werden kann.
Anmerkungen:

Vorgehen

Via AT einen Timer definieren um die Aggregations Funktion aufzurufen
defmod at_aggr_MQTT2_AMSreader_2 at *04:00:00 {AgrLog(30,1,5,'MQTT2_AMSreader')}Die Aufrufparameter sind im Code beschrieben.

Die folgende Sub in 99_myUtils.pm einfügen, und in Coce deine Zugangsdaten eintragen

# Datenbankverbindung herstellen
my $dbh = DBI->connect("dbi:Pg:database=postgres;
                        host=localhost",
                  "DEINE_USERID",
                  "DEIN_PASSOWRT")

##########################################################################
#   Agregiere Log
sub AgrLog ($$$$)
{

my ($olderthan,$days,$agregTo,$agrdev) = @_;

# possible values for agregTo 1, 2, 3,  4, 5, 6, 10, 12, 15, 20, 30   
if (($agregTo==1)  or ($agregTo==2)  or  ($agregTo==3)  or ($agregTo==4)  or ($agregTo==5)  or ($agregTo==6) or
    ($agregTo==10) or ($agregTo==12) or ($agregTo==15) or ($agregTo==20) or ($agregTo==30)){
   # Input ok
}
else {
   die "Inpunt Value = $agregTo for agregTo not possible ";
}

my$currentDT =     FmtDateTime(time());
print "\n";
print "----------------- $currentDT  ---------------------- \n";
print "Daten älter als  $olderthan Tage werden aggregiert   \n";
print "Es werden $days zürückliegende Tage aggergiert       \n";
print "Daten werden zu $agregTo Minuten Eiheiten aggregiert \n";
print "Daten für $agrdev werden aggregiert                  \n";

# --- Zeit zusammen stellen
my$currentD  =  DfromDT( FmtDateTime(time()) );
print "currentDT   = $currentDT  \n";
print "currentD    = $currentD   \n";

my ($year, $month, $day) = split("-", $currentD );

#print "year  = $year  \n";
#print "month = $month \n";
#print "day   = $day   \n";

$month--;   # Monate in Time::Local sind 0-basiert (Januar = 0)

my $end_epoch_time = timelocal(0, 0, 0, $day, $month, $year);
   $end_epoch_time = $end_epoch_time - ($olderthan * 86400);
my $end_hmr_time   = FmtDateTime($end_epoch_time) ;          #Human readable

my $start_epoch_time  =  $end_epoch_time - ($days * 86400);
my $start_hmr_time      =  FmtDateTime($start_epoch_time) ;  #Human readable

print "Daten zwischen $start_hmr_time und $end_hmr_time werden aggregiert  \n";
# --- ENDE  Zeit zusammen stellen

if ($agrdev eq 'T' ){
print "kein Device, nur Zeitberechnung\n";
return;
}


# optional Zugangsdaten für DB Verbundung aus File auslesen
#  /opt/fhem/db.conf 
# implementation momentan noch ausstehend

# Datenban Variablen + zischenspeicher
my $sth; 
my $tabExist;

# Datenbankverbindung herstellen
my $dbh = DBI->connect("dbi:Pg:database=postgres;
                        host=localhost",
"DEINE_USERID",
"DEIN_PASSWORT")
  or die "Verbindung fehlgeschlagen: " . DBI->errstr;


#---- Prüfen ob work Tabelle bereis vorhanden, und allenfalls anlegen ------------------------
# SQL-Abfrage erstellen und ausführen
$sth = $dbh->prepare("select exists (select 1 from pg_tables where schemaname ='fhem' and pg_tables.tablename = 'work');")
  or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute()
  or die "SQL execute fehlgeschlagen: " . $sth->errstr;

# Ergebnisse abrufen und ausgeben
while (my @row = $sth->fetchrow_array) {
  $tabExist = $row[0];
}
  if ($tabExist ==1) {
print " Work Tabbelle bereits vorhanden\n";
  }
  else{
print "Work Tabbelle jetzt anlegen\n";

# SQL-Abfrage erstellen und ausführen
    $sth = $dbh->prepare("CREATE TABLE work (
    timestamp timestamp without time zone default CURRENT_TIMESTAMP,
    device character varying(64),
    type character varying(64),
    event character varying(512),
    reading character varying(64),
    value character varying(128),
    unit character varying(32)
    );")
       or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;

$sth->execute()
      or die "SQL execute fehlgeschlagen: " . $sth->errstr;


# SQL-Abfrage erstellen und ausführen
    $sth = $dbh->prepare("ALTER TABLE fhem.work OWNER TO fhem;")
      or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
   
$sth->execute()
      or die "SQL execute fehlgeschlagen: " . $sth->errstr;


    # SQL-Abfrage erstellen und ausführen
    $sth = $dbh->prepare("CREATE INDEX Search_IdxW ON work USING btree (device, reading, timestamp);")
      or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;


$sth->execute()
      or die "SQL execute fehlgeschlagen: " . $sth->errstr;   


print "Tabbelle ist erstellt\n";
 
} # end Tabelle anlegen
#----   end Tabelle anlegen



# zu aggregierende reading für gewünsches device ermitteln
# SQL-Abfrage erstellen und ausführen
$sth = $dbh->prepare("SELECT DISTINCT READING FROM FHEM.HISTORY WHERE DEVICE = ? AND TIMESTAMP BETWEEN ? AND ? ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute($agrdev, $start_hmr_time, $end_hmr_time )
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;
 
print "Number of Readings found for $agrdev = ".$sth->rows ."\n";

my @readingList;

while (my @row = $sth -> fetchrow_array()){
push @readingList, $row[0] ;
}

# zur Kontrolle alle ermittelte Readings ausgeben
foreach my $reading (@readingList){
print " Reading aus readingList = $reading \n";
}


my $loopVar= $start_epoch_time ;
my $loopVar2;
my ($Ins0, $Ins1, $Ins2, $Ins3, $Ins4, $Ins5, $Ins6 );


#Aggregation und insert in Work Tabelle   
do {# Aggregation: loop über Zeit Einheiten
$loopVar2 = $loopVar +($agregTo*60);
#print " aggregiere von ".FmtDateTime($loopVar)." bis ".FmtDateTime($loopVar2)." \n"; 

# pro Zeiteinheit loop über alle Readings
foreach my $reading (@readingList){
   #print "READING: " .$reading . "\n";
   print " aggregiere von ".FmtDateTime($loopVar)." bis ".FmtDateTime($loopVar2). " für   $reading   \n"; 
   #Aggregate SQL
    $sth = $dbh->prepare("INSERT INTO
FHEM.WORK (
TIMESTAMP,
DEVICE,
TYPE,
EVENT,
READING,
VALUE,
UNIT
)
SELECT
MIN(TIMESTAMP) AS TIMESTAMP,
DEVICE,
TYPE,
NULL AS EVENT,
READING,
AVG(CAST(VALUE AS DOUBLE PRECISION)),
NULL AS UNIT
FROM
FHEM.HISTORY
WHERE
DEVICE = ?
AND READING = ?
AND TIMESTAMP BETWEEN ? AND ?
GROUP BY
DEVICE,
TYPE,
READING;")
            or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
        $sth->execute($agrdev, $reading, FmtDateTime($loopVar), FmtDateTime($loopVar2) )
          or die "SQL execute fehlgeschlagen: " . $sth->errstr;
 


    }
$loopVar = $loopVar2;

} while (  $loopVar2 < $end_epoch_time );

#Unaggregierte Daten in Histroy Tabbelle löschen
$currentDT =     FmtDateTime(time()); 
print "----- start Delete Data form  History  Table  --- $currentDT  ------- \n";
$sth = $dbh->prepare("Delete FROM FHEM.HISTORY WHERE DEVICE = ? AND TIMESTAMP BETWEEN ? AND ? ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute($agrdev, $start_hmr_time, $end_hmr_time )
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;
   

# Aggregierte Daten von Work in History Tabelle übertragen
$currentDT =     FmtDateTime(time()); 
print "----- start copy work to History   --- $currentDT  -------------- \n";
$sth = $dbh->prepare("INSERT INTO FHEM.History (TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT)
                                  SELECT TIMESTAMP, DEVICE, TYPE, EVENT, READING, VALUE, UNIT
                                  FROM FHEM.work ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute()
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;
   
   
# Daten in Work Tabbelle löschen
$currentDT =     FmtDateTime(time()); 
print "----- start Delete Data form  work  Table  --- $currentDT  ------- \n";
$sth = $dbh->prepare("Delete FROM FHEM.work ")
   or die "SQL prepare fehlgeschlagen: " . $dbh->errstr;
 
$sth->execute( )
   or die "SQL execute fehlgeschlagen: " . $sth->errstr;


# DB Verbindung schließen
$sth->finish;
$dbh->disconnect;
 
$currentDT =     FmtDateTime(time()); 
print "----------------- $currentDT  ---------------------- \n";

} #END Agr Log ---


##############################################################################
##

Viel Spass beim aggregieren, Bitte denkt daran, ich übernehme keine Verantwortung für Eure Daten Ihr macht das auf eigene Gefahr.

Titel: Aw: DB Log Daten aggregieren (für Postgres)
Beitrag von: betateilchen am 19 Januar 2026, 18:15:44
Das kann doch FHEM mittels DbRep bestimmt von Haus aus?

In der commandref dazu steht:

ZitatCurrently the following functions are provided:

...

- Calculation of quantity of datasets of a Device/Reading-combination within adjustable time limits and several aggregations.
- The calculation of summary-, difference-, maximum-, minimum- and averageValues of numeric readings within adjustable time limits and several aggregations.

...
Titel: Aw: DB Log Daten aggregieren (für Postgres)
Beitrag von: birdy am 19 Januar 2026, 19:29:51
Möglicherweise hast du da etwas falsch verstanden.
Es geht nicht darum irgendwelche Summen/Mittelwerte zu bilden.
Es geht darum das Log zu aggregieren bzw. zu verkleinern. Also viele einzelne Einträge aus der DB zu entfernen und durch einen / wenige aggregierte Einträge zu ersetzen. Also reduzieren der Anzahl der Datenpunkte für die Langzeitaufbewarung.
Entsprich von der Idee, dieser Funktion.
Titel: Aw: DB Log Daten aggregieren (für Postgres)
Beitrag von: betateilchen am 19 Januar 2026, 19:43:48
Zitat von: birdy am 19 Januar 2026, 19:29:51Es geht darum das Log zu aggregieren bzw. zu verkleinern. Also viele einzelne Einträge aus der DB zu entfernen

Auch das geht mit DbRep, ich mache sowas jeden Monatsletzten um 23:59 Uhr, um bestimmte Kurswerte, die ich alle 10 Minuten erhalte, aus dem vorletzten Monat auf einen Mittelwert pro Tag zu reduzieren.

defmod at_reduceLog2 at *{at_ultimo} set dbrep_2 reduceLog average=day

defmod dbrep_2 DbRep logDb2
attr dbrep_2 comment used for reducelog on ultimo
attr dbrep_2 timestamp_begin previous_month_begin
attr dbrep_2 timestamp_end previous_month_end

Anstatt Mittelwert kann man selbstverständlich auch andere Werte verwenden.
Titel: Aw: DB Log Daten aggregieren (für Postgres)
Beitrag von: birdy am 19 Januar 2026, 20:08:49
Ja, wenn du nach dem Reduzieren mit einem Wert pro Tag zufrieden bist.
Ich sehe Du hast dieses Problem gar nicht, oder aber nicht verstanden.
Bei einer Vielzahl von Werten macht die Aggregation auf nur einen Wert pro Tag keinen Sinn. Ich selbst kenne keinen Sensoren bei dem es Sinn macht die Daten für die Lanzeitspeicherung auf einen einzigen Wert pro Tag zu reduzieren....??
Bei mir sind es die Powerwerte von Netzbezug, Solar usw.
Die brauche ich nicht über Jahre im 5 Sekundentakt. Aber einen Verlauf über den Tage möchte durchaus sehen.