Skip to content

Assure1::MessageBus::Client Perl library

Description

Implements the client methods that a process needs in order to to interact with the message bus.

Synopsis

use Assure1::MessageBus::Client;

my $MBClient = Assure1::MessageBus::Client($baseDir, \[{OPTIONS}\]);
my $response = $mbClient->ClientRequest(METHOD, \[ARGUMENTS\]);

Constructor

new

Create new client for sending and receiving messages

new(\%options)

Options

log       -> Assure1::Log instance, omitt to disable logging
config    -> Assure1::Config instance
workqueue -> Optional reference to a Thread::Queue to send incoming messages to for processing, if not given will ignore any incoming broker messages that are not replies.
             Incoming messages are queued in the following frozen format (use Storable):
               body          - message body; this can contain any number of formats such as a hash, array, or string
               type          - message type (either "direct" or "failover")
               rawmessage    - raw message info including message properties; use when sending replies through reply() as 'orig_msg'
bindings  -> Additional exchanges to bind to (e.g. EX_THRESHOLD)
timeout   -> Time to wait for response before timing out. Default 30 seconds
server    -> Specific broker to talk to; should be the FQDN. Defaults to to local hostfqdn
blocking  -> set whether to block on ClientRequest and GetResponse. Defaults to 1

Returns

Assure1::MessageBus::Client object

Synopsis

my $MBQueue      = new Thread::Queue;
my $MBClient     = Assure1::MessageBus::Client->new({
    log       => $Log,
    config    => $config,
    workqueue => \$MBQueue
});

threads->new(sub {
    # if we can't start the handler thread we want to kill the app
    threads->set_thread_exit_only(0);

    eval {
        local $SIG{__DIE__};
        $MBClient->handle();
    };

    if ($@) {
        $Self->{Log}->Message('FATAL', '{SIGDIE}: ' . $@);
    }
})->detach();

Methods

ClientRequest

Generic client request to call an RPC method with arguments.

ClientRequest(\%options)

Options

method    - RPC method to call
arguments - optional arguments to pass with the request
server    - optional recipient server, defaults to the value of server() which defaults to local Assure1 broker
blocking  - optional returns after sending request without waiting for response. Defaults to current blocking() setting. Response can be retrieved later with a GetResponse() if necessary.

Returns

Hashref containing the following response keys:
Success -> 1 for Success, 0 for Failure, or -1 for in non-blocking mode and no response= yet
Message -> RPC response message, error text if failed
Data    -> Data returned or empty array if nothing. If blocking = 1 returns the message ID with which to use with a subsequent GetResponse() call

GetResponse

GetResponse to a given original message.

GetResponse($MessageID, $Blocking)

Arguments

MessageID - MessageID to check response for
Blocking  - optional: 1 to wait for response, 0 to return data if ready or without waiting, defaults to current blocking() setting

Returns

Hashref containing the following response keys:
Success -> 1 for Success, 0 for Failure, or -1 when in non-blocking mode and no response yet
Message -> RPC response data, error text if failed, or if blocking = 1 returns the message ID

Synopsis

# Sending a message to local broker
Broker::Client->Reply({
   orig_msg => $msg,
   reply    => {success => 1, message => 'returning 2 items', data => [1,2]}
});

Reply

Reply to a given original message.

Reply(\%options)

Options

reply    - Reply hash, gets converted to a JSON string. must contain the following keys:
             success - boolean success or failure
             message - optional message, if success=false, this should be the error message and data should be blank
             data    - array of data to return if any; items can be any data type except blessed objects, even hashes or arrays.
orig_msg - Original message information the reply is for

Synopsis

# Sending a message to local broker
Broker::Client->Reply({
   orig_msg => $msg,
   reply    => {success => 1, message => 'returning 2 items', data => [1,2]}
});

SendHealthMetric

Send a health metric to InfluxDB via Telegraf

SendHealthMetric(\%options)

Options

measurement      -> Metric Type Name
tags             -> hash reference of:
    host         -> (optional) Device Host (defaults to local HostFQDN)
    instanceName -> Instance Name (e.g. Application Name)
fields           -> hash reference of:
    value        -> value to send
    utilization  -> (optional) Utilization of value if a max value is known
epochTime        -> (optional) Epoch time collected (defaults to time()). Strongly recommend using the epoch from time() rounded to nearest poll time

SendThresholdViolation

Queue a threshold violation.

SendThresholdViolation(\%options)

Options

ThresholdType      -> PollerThresholds.ThresholdTypeID (1 => Standard, 10 => Trend % Change, 14 => Abnormal, 16 => Missing Data, 20 => Trend Prediction)
ThresholdName      -> PollerThresholds.ThresholdName
ThresholdMessage   -> PollerThresholds.Message
ThresholdTimeRange -> PollerThresholds.TimeRange
ThresholdOperator  -> PollerThresholds.Warning|CriticalOperator
ThresholdValue     -> PollerThresholds.Warning|CriticalValue
ThresholdSeverity  -> PollerThresholds.Warning|CriticalSeverity
Time               -> EpochTimeStamp
Measurement        -> PollerThresholds.Measurement
MetricValue        -> InfluxDB measurement Mean value|availability|utilization (from MetricField)
DeviceName         -> InfluxDB measurement deviceName
InstanceName       -> InfluxDB measurement instance
DeviceZoneID       -> InfluxDB measurement devicezone

blocking

Get current block status or set whether to block waiting for responses.

blocking([$block])

clusterID

Get current clusterID or set clusterID for clustered polling/load balancing. Should be the FailoverName given when joining a pool

clusterID([$clusterID])

close

Close the current connection to the message bus

close()

server

Get current server or set server to send any ClientRequests to. Should be the Assure1 broker FQDN.

server([$server])

workqueue

Get current workqueue or set workqueue. Should be a reference to a Thread::Queue

workqueue([$workqueue])