Asynchrone Verarbeitung / Multiprocessing

Begonnen von Loredo, 27 Dezember 2013, 19:24:50

Vorheriges Thema - Nächstes Thema

Loredo

Hallo zusammen,


wie ist denn der allgemeine Stand zum Thema Multiprocessing? Ist das noch aktuell:
http://www.fhemwiki.de/wiki/DevelopmentGuidelines#Mechanismus_fuer_langsame_Readings


Hintergrund: Ich würde die Poll-Abfragen bei meinen Modulen gerne asynchron gestalten, mich dabei allerdings natürlich daran orientieren, wie bisherige Lösungen aussehen.
Habe per Suche im Quellcode jetzt nichts in der Richtung finden können, wie es dort in dem Artikel beschrieben ist. Daher dachte ich, ich frage besser mal nach :-)




Gruß
Julian
Hat meine Arbeit dir geholfen? ⟹ https://paypal.me/pools/c/8gDLrIWrG9

Maintainer:
FHEM-Docker Image, https://github.com/fhem, Astro(Co-Maintainer), ENIGMA2, GEOFANCY, GUEST, HP1000, Installer, LaMetric2, MSG, msgConfig, npmjs, PET, PHTV, Pushover, RESIDENTS, ROOMMATE, search, THINKINGCLEANER

ntruchsess

Du kannst Dir mal ansehen, wie ich das im asynchronen OWX mit einem Hintergrundthread gelöst habe: 10_OWX.pm läuft im wesentlichen im FHEM-hauptthread und kommuniziert über eine Queue mit 11_OWX_Executor.pm.

Funktioniert im Prinzip fast einwandfrei. Aber eben nur fast - beim Herunterfahren bzw. Neustarten des Backgroundthreads schmiert mir (und anderen, die es getestet haben) der komplette Perl-prozess mit einer Access-violation ab. Solange der Thread arbeiten soll und man die Konfiguration nicht online ändert, dann funzt es prima.

Ich hab deshalb mal eine Basisklasse für ein auf Forken basiertes Asynchrones Device geschrieben: AsyncDevice.pm, die mit dem FHEM-prozess über Sockets kommuniziert. Davon kannst Du dich gerne inspirieren lassen - das Ganze ist aber leider noch nicht 100% ausentwickelt. Außerdem gefällt mir das Forken von Hintergrundprozessen auch nicht wirklich, weil es auf kleinen Geräten wie z.B. Fritzboxen ggf zu viel Speicher frisst.

Als Alternative schaue ich mir grade an, wie man das Lesen bei einer eigentlich synchronen Kommunikation sauber asynchron in die FHEM-mainloop (über die selectlist) verlagert. Wenn die Anwortdaten sich nicht ohne Wissen über den Request sauber parsen lassen braucht man im Modul eine Art state-machine, damit das Modul beim Aufruf der ReadFn immer weiß, auf was da grade die Antwort reinkommt, bzw. wie viel von dieser Antwort schon da ist. Eine elegante Methode wäre das so wie mit Protothreads zu machen, dann sähe der Code fast genauso wie der synchrone code aus, man würde die jeweilige Methode nur nach Abschicken eines Request verlassen um beim nächsten Aufruf an gleicher Stelle weiterzumachen. (Zwischendurch war dann mal die Fhem-mainloop dran und hat ggf. schon die erwarteten Daten lesen lassen). Ich experimentiere grade mit verschiedenen Patterns rum, wie man das kompakt und wiederverwendbar machen kann. Das kann z.B. so aussehen:


package ProtoThreads;

use strict;
use warnings;
use Template;

use constant {
  PT_WAITING => 0,
  PT_EXITED => 1,
  PT_ENDED => 2,
  PT_YIELDED => 3,
};

our %threads = ();

sub execute($$) {
  my ($thread,$cond) = @_;
 
  #PT_BEGIN($thread);
  PT_THREAD: { my $PT_THREAD_STATE = $thread->{state}; $PT_THREAD_STATE eq 0 and do {
      print "first\n";
    #PT_YIELD
    $thread->{state} = __LINE__; return PT_YIELDED; }; $PT_THREAD_STATE eq __LINE__ and do {
      print "second\n";
    #PT_YIELD_UNTIL($cond)
    $thread->{state} = __LINE__; return PT_YIELDED; }; $PT_THREAD_STATE eq __LINE__ and do { return PT_YIELDED unless ($cond);
      print "third\n";
    #PT_WAIT_UNTIL(pt, condition)
    $thread->{state} = __LINE__;}; $PT_THREAD_STATE eq __LINE__ and do { return PT_WAITING unless ($cond);
      print "third\n";
    #PT_WAIT_WHILE(pt, cond)
    $thread->{state} = __LINE__;}; $PT_THREAD_STATE eq __LINE__ and do { return PT_WAITING if ($cond);
      print "forth\n";
    #PT_WAIT_THREAD(pt, thread) PT_WAIT_WHILE((pt), PT_SCHEDULE(thread))     
    #$thread->{state} = __LINE__;}; $PT_THREAD_STATE eq __LINE__ and do { return PT_WAITING if (execute($thread) == PT_WAITING);
    #  print "fifth\n";
    #PT_EXIT($thread);
    }; $thread->{state} = 0; return PT_EXITED; };
};

#PI_INIT
my $thread = { state => 0 };
my $condition = 1;
my $continue = time+5;
my $print = time+1;

while (execute($thread,($continue - time)<0) ne PT_EXITED) { unless (($print-time) > 0) { print "$thread->{state}\n"; $print = time+1 };};

1;


An einem Perl-source-filter, der die doch eher unhandlichen Ausdrücke auf sowas hier verkürzt, bastel ich grade:

sub execute($$) {
  my ($thread,$cond) = @_;
 
  PT_BEGIN($thread);
  print "first\n";
  PT_YIELD;
  print "second\n";
  PT_YIELD_UNTIL($cond);
  print "third\n";
  PT_EXIT($thread);
};

#PI_INIT
my $thread = { state => 0 };
my $condition = 1;
my $continue = time+5;
my $print = time+1;

while (execute($thread,($continue - time)<0) ne PT_EXITED) { unless (($print-time) > 0) { print "$thread->{state}\n"; $print = time+1 };};


Gruß,

Norbert
while (!asleep()) {sheep++};

Markus Bloch

Hallo Loredo,

ich kann dir das Modul 32_speedtest.pm empfehlen. Dies veranschaulicht auf simple Weise, wie man mittels Blocking.pm solche langsammen Polling-Requests via Forking vom Hauptprozess entkoppeln kann.

Das Modul Blocking.pm ermöglicht es eine lang dauernde Funktion in einem Kindprozess via Forking abzuarbeiten und das Resultat an den Hauptprozess (FHEM) wieder zurückzugeben.

Bei 32_speedtest.pm ist es der Aufruf der speedtest-cli zum Testen der Internetverbindung der einige Sekunden in Anspruch nimmt

Dazu findest du im Wiki auch einen entsprechenden Artikel: http://www.fhemwiki.de/wiki/Blocking_Call

Viele Grüße

Markus

Developer für Module: YAMAHA_AVR, YAMAHA_BD, FB_CALLMONITOR, FB_CALLLIST, PRESENCE, Pushsafer, LGTV_IP12, version

aktives Mitglied des FHEM e.V. (Technik)