Assure1::MessageBus::Client Perl library¶
Description¶
Implements the client methods that a process needs in order to to interact with the message bus.
Synopsis¶
use Assure1::MessageBus::Client;
my $MBClient = Assure1::MessageBus::Client($baseDir, \[{OPTIONS}\]);
my $response = $mbClient->ClientRequest(METHOD, \[ARGUMENTS\]);
Constructor¶
new¶
Create new client for sending and receiving messages
new(\%options)
Options
log -> Assure1::Log instance, omitt to disable logging
config -> Assure1::Config instance
workqueue -> Optional reference to a Thread::Queue to send incoming messages to for processing, if not given will ignore any incoming broker messages that are not replies.
Incoming messages are queued in the following frozen format (use Storable):
body - message body; this can contain any number of formats such as a hash, array, or string
type - message type (either "direct" or "failover")
rawmessage - raw message info including message properties; use when sending replies through reply() as 'orig_msg'
bindings -> Additional exchanges to bind to (e.g. EX_THRESHOLD)
timeout -> Time to wait for response before timing out. Default 30 seconds
server -> Specific broker to talk to; should be the FQDN. Defaults to to local hostfqdn
blocking -> set whether to block on ClientRequest and GetResponse. Defaults to 1
Returns
Assure1::MessageBus::Client object
Synopsis
my $MBQueue = new Thread::Queue;
my $MBClient = Assure1::MessageBus::Client->new({
log => $Log,
config => $config,
workqueue => \$MBQueue
});
threads->new(sub {
# if we can't start the handler thread we want to kill the app
threads->set_thread_exit_only(0);
eval {
local $SIG{__DIE__};
$MBClient->handle();
};
if ($@) {
$Self->{Log}->Message('FATAL', '{SIGDIE}: ' . $@);
}
})->detach();
Methods¶
ClientRequest¶
Generic client request to call an RPC method with arguments.
ClientRequest(\%options)
Options
method - RPC method to call
arguments - optional arguments to pass with the request
server - optional recipient server, defaults to the value of server() which defaults to local Assure1 broker
blocking - optional returns after sending request without waiting for response. Defaults to current blocking() setting. Response can be retrieved later with a GetResponse() if necessary.
Returns
Hashref containing the following response keys:
Success -> 1 for Success, 0 for Failure, or -1 for in non-blocking mode and no response= yet
Message -> RPC response message, error text if failed
Data -> Data returned or empty array if nothing. If blocking = 1 returns the message ID with which to use with a subsequent GetResponse() call
GetResponse¶
GetResponse to a given original message.
GetResponse($MessageID, $Blocking)
Arguments
MessageID - MessageID to check response for
Blocking - optional: 1 to wait for response, 0 to return data if ready or without waiting, defaults to current blocking() setting
Returns
Hashref containing the following response keys:
Success -> 1 for Success, 0 for Failure, or -1 when in non-blocking mode and no response yet
Message -> RPC response data, error text if failed, or if blocking = 1 returns the message ID
Synopsis
# Sending a message to local broker
Broker::Client->Reply({
orig_msg => $msg,
reply => {success => 1, message => 'returning 2 items', data => [1,2]}
});
Reply¶
Reply to a given original message.
Reply(\%options)
Options
reply - Reply hash, gets converted to a JSON string. must contain the following keys:
success - boolean success or failure
message - optional message, if success=false, this should be the error message and data should be blank
data - array of data to return if any; items can be any data type except blessed objects, even hashes or arrays.
orig_msg - Original message information the reply is for
Synopsis
# Sending a message to local broker
Broker::Client->Reply({
orig_msg => $msg,
reply => {success => 1, message => 'returning 2 items', data => [1,2]}
});
SendHealthMetric¶
Send a health metric to InfluxDB via Telegraf
SendHealthMetric(\%options)
Options
measurement -> Metric Type Name
tags -> hash reference of:
host -> (optional) Device Host (defaults to local HostFQDN)
instanceName -> Instance Name (e.g. Application Name)
fields -> hash reference of:
value -> value to send
utilization -> (optional) Utilization of value if a max value is known
epochTime -> (optional) Epoch time collected (defaults to time()). Strongly recommend using the epoch from time() rounded to nearest poll time
SendThresholdViolation¶
Queue a threshold violation.
SendThresholdViolation(\%options)
Options
ThresholdType -> PollerThresholds.ThresholdTypeID (1 => Standard, 10 => Trend % Change, 14 => Abnormal, 16 => Missing Data, 20 => Trend Prediction)
ThresholdName -> PollerThresholds.ThresholdName
ThresholdMessage -> PollerThresholds.Message
ThresholdTimeRange -> PollerThresholds.TimeRange
ThresholdOperator -> PollerThresholds.Warning|CriticalOperator
ThresholdValue -> PollerThresholds.Warning|CriticalValue
ThresholdSeverity -> PollerThresholds.Warning|CriticalSeverity
Time -> EpochTimeStamp
Measurement -> PollerThresholds.Measurement
MetricValue -> InfluxDB measurement Mean value|availability|utilization (from MetricField)
DeviceName -> InfluxDB measurement deviceName
InstanceName -> InfluxDB measurement instance
DeviceZoneID -> InfluxDB measurement devicezone
blocking¶
Get current block status or set whether to block waiting for responses.
blocking([$block])
clusterID¶
Get current clusterID or set clusterID for clustered polling/load balancing. Should be the FailoverName given when joining a pool
clusterID([$clusterID])
close¶
Close the current connection to the message bus
close()
server¶
Get current server or set server to send any ClientRequests to. Should be the Assure1 broker FQDN.
server([$server])
workqueue¶
Get current workqueue or set workqueue. Should be a reference to a Thread::Queue
workqueue([$workqueue])