Websocket
FHEM kann mit Websockets kommunizieren indem DevIO genutzt wird [1]. Bisher werden Websockets nur mit Perl-Befehlen angesprochen.
Schritte:
- Setzen der Parameter:
- Setzen des Kommunikationsendpunktes:
$hash->{DeviceName} = "wss:echo.websocket.org:443/pfad";
Wobei die Portnummer zwingend hinzugefügt werden muss, wenn ein Pfad spezifiziert wird. - Optional: Setzen von speziellen Headerangaben:
$hash->{header}{'Sec-WebSocket-Protocol'} = 'graphql-transport-ws'
;
- Setzen des Kommunikationsendpunktes:
- Setzen einer CallBack Funktion:
- Wenn Daten von der Websocket empfangen werden, werden diese an eine Funktion übergeben. Diese macht man mit der directReadFn bekannt:
$hash->{directReadFn}
- Wenn Daten von der Websocket empfangen werden, werden diese an eine Funktion übergeben. Diese macht man mit der directReadFn bekannt:
- Starten der Kommunikation:
DevIo_OpenDev(...);
Beispiele
Tibber Live-Messdaten auslesen
Folgendes Beispiel liest Strommesswerte von dem Anbieter Tibber aus. Mit set Tibber.ws start
wird die Verbindung aufgebaut und mit set Tibber.ws stop
wieder gestoppt. Siehe auch den zugehörigen Forums-Thread
defmod Tibber.ws dummy
attr Tibber.ws userattr websocketURL homeId token myId minInterval
attr Tibber.ws alias Tibber Websocket
attr Tibber.ws event-on-change-reading .*
attr Tibber.ws eventMap /cmd connect:start/cmd disconnect:stop/
attr Tibber.ws homeId 96a14971-525a-4420-aae9-e5aedaa129ff
attr Tibber.ws icon hue_filled_plug
attr Tibber.ws minInterval 30
attr Tibber.ws myId TorxgewindeID
attr Tibber.ws readingList cmd
attr Tibber.ws setList cmd
attr Tibber.ws stateFormat payload_data_liveMeasurement_accumulatedCost payload_data_liveMeasurement_currency (payload_data_liveMeasurement_power W, Import: payload_data_liveMeasurement_accumulatedConsumption kWh, Export: payload_data_liveMeasurement_accumulatedProduction kWh)
attr Tibber.ws token 5K4MVS-OjfWhK_4yrjOlFe1F6kJXPVf7eQYggo8ebAE
attr Tibber.ws userReadings connect:cmd:.connect {\
my $hash = $defs{$name};;\
my $devState = DevIo_IsOpen($hash);;\
return "Device already open" if (defined($devState));;\
\
# establish connection to websocket\
# format must also include portnumber if a path is to be specified\
$hash->{DeviceName} = AttrVal($name, "websocketURL", "wss:echo.websocket.org:443");;\
\
# special headers needed for Tibber, see also Developer Tools in Browser\
$hash->{header}{'Sec-WebSocket-Protocol'} = 'graphql-transport-ws';;\
$hash->{header}{'Host'} = 'websocket-api.tibber.com';;\
$hash->{header}{'Origin'} = 'https://developer.tibber.com';;\
\
# callback function when "select()" signals data for us\
# websocket Ping/Pongs are treated in DevIo but still call this function\
$hash->{directReadFn} = sub () {\
my $hash = $defs{$name};;\
\
# we can read without closing the DevIo, because select() signalled data\
my $buf = DevIo_SimpleRead($hash);;\
\
# if read fails, close device\
if(!defined($buf)) {\
DevIo_CloseDev($hash);;\
$buf = "not_connected";;\
}\
\
#Log(3, "$name:$reading: websocket data: >>>$buf<<<");;\
\
# only update our reading if buffer is not empty and if last update is older than minInterval\
if ($buf ne "") {\
my $websocketDataAge = ReadingsAge($name, "websocketData", 3600);;\
my $minInterval = AttrVal($name, "minInterval", 0);;\
my $isNext = ($buf =~ /.*id.*type.*next.*payload.*data.*liveMeasurement.*/s);;\
\
readingsBeginUpdate($hash);;\
readingsBulkUpdate($hash, "websocketData", "$buf") if ($isNext && $websocketDataAge > $minInterval);;\
readingsBulkUpdate($hash, "websocketData", "$buf") if (!$isNext);;\
readingsEndUpdate($hash, 1);;\
}\
};;\
\
# open DevIo websocket\
DevIo_OpenDev($hash, 0, undef, sub(){\
my ($hash, $error) = @_;;\
return "$error" if ($error);;\
\
my $token = AttrVal($name, "token", "???");;\
\
DevIo_SimpleWrite($hash, '{"type":"connection_init","payload":{"token":"'.$token.'"}}', 2);;\
});;\
readingsBulkUpdate($hash, "websocketData", "");;\
\
return POSIX::strftime("%H:%M:%S",localtime(time()));;\
},\
disconnect:cmd:.disconnect {\
my $hash = $defs{$name};;\
RemoveInternalTimer($hash);;\
DevIo_SimpleRead($hash);;\
DevIo_CloseDev($hash);;\
\
return POSIX::strftime("%H:%M:%S",localtime(time()));;\
},\
onDisconnect {\
my $myState = ReadingsVal($name, "state", "???");;\
my $myData = ReadingsVal($name, "websocketData", "???");;\
return if ($myState ne "disconnected" and $myData ne "not_connected");;\
\
## timer callback function, called after a few seconds to initiate a reconnect\
my $timerFunction = sub() {\
my ($hash) = @_;;\
my $devState = DevIo_IsOpen($hash);;\
\
# only re-connect if device is not connected\
readingsSingleUpdate($hash, "cmd", "connect", 1) if (!defined($devState));;\
};;\
my $hash = $defs{$name};;\
RemoveInternalTimer($hash, $timerFunction);;\
\
# wait a random time before reconnect (exponential backoff TBD):\
my $rwait = int(rand(200)) + 30;;\
InternalTimer(gettimeofday() + $rwait, $timerFunction, $hash);;\
readingsBulkUpdate($hash, "cmd", "reconnect attempt in $rwait seconds");;\
\
return POSIX::strftime("%H:%M:%S",localtime(time()));;\
},\
onConnectionAck:websocketData:.*connection_ack.* {\
#websocketData contains the string "connection_ack"\
Log(3, "$name:$reading: got connection ack");;\
\
# do not proceed if connection is lost\
my $hash = $defs{$name};;\
my $devState = DevIo_IsOpen($hash);;\
return "Device not open" if (!defined($devState));;\
\
readingsBulkUpdate($hash, "cmd", "got connection ack");;\
\
my $homeId = AttrVal($name, "homeId", "???");;\
my $myId = AttrVal($name, "myId", "???");;\
\
# build the query, do it in pieces, the comma at the end caused perl errors\
# so we put it together in this not very elegant way\
my $json = '{ "id":"'. $myId .'", "type":"subscribe"'.", ";;\
$json .= '"payload":{';;\
$json .= '"variables":{}'.", ";;\
$json .= '"extensions":{}'.", ";;\
$json .= '"query":"subscription { liveMeasurement( homeId: \"'.$homeId.'\" ) ';;\
$json .= '{ timestamp power lastMeterConsumption accumulatedConsumption accumulatedProduction ';;\
$json .= 'accumulatedProductionLastHour accumulatedCost accumulatedReward currency minPower averagePower maxPower ';;\
$json .= 'powerProduction powerReactive powerProductionReactive minPowerProduction maxPowerProduction lastMeterProduction ';;\
$json .= 'powerFactor voltagePhase1 voltagePhase2 voltagePhase3 signalStrength }}"';;\
$json .= '}}';;\
\
#send the string via websocket as ASCII\
Log(3, "$name:$reading: sending JSON: >>>$json<<<");;\
DevIo_SimpleWrite($hash, $json, 2);;\
\
return POSIX::strftime("%H:%M:%S",localtime(time()));;\
},\
onNextLiveMeasurement:websocketData:.*next.*payload.*data.*liveMeasurement.* {\
#websocketData contains next-live-measurement-data\
my $val = ReadingsVal($name, "websocketData", "{}");;\
my %res = %{json2nameValue($val, undef, undef, "payload_data_liveMeasurement.*")};;\
\
my $ret = "got values for:\n";;\
foreach my $k (sort keys %res) {\
$ret .= "$k\n";;\
readingsBulkUpdate($hash, makeReadingName($k), $res{$k});;\
}\
return $ret;;\
}
attr Tibber.ws webCmd start:stop
attr Tibber.ws websocketURL wss:websocket-api.tibber.com:443/v1-beta/gql/subscriptions