Interprocess publish/subscribe channels
# A new message bus with the DBM::Deep backend
# (Other possible backends include Memcached and PlainHash)
my $bus = IPC::PubSub->new(DBM_Deep => '/tmp/pubsub.db');
# A channel is any arbitrary string
my $channel = '#perl6';
# Register a new publisher (you can publish to multiple channels)
my $pub = $bus->new_publisher("#perl6", "#moose");
# Publish a message (may be a complex object) to those channels
$pub->msg("This is a message");
# Register a new subscriber (you can subscribe to multiple channels)
my $sub = $bus->new_subscriber("#moose");
# Publish an object to channels
$pub->msg("This is another message");
# Set all subsequent messages from this publisher to expire in 30 seconds
$pub->expiry(30);
$pub->msg("This message will go away in 30 seconds");
# Simple get: Returns the messages sent since the previous get,
# but only for the first channel.
my @msgs = $sub->get;
# Simple get, with an explicit channel key (must be among the ones
# it initially subscribed to)
my @moose_msgs = $sub->get("#moose");
# Complex get: Returns a hash reference from channels to array
# references of [timestamp, message].
my $hash_ref = $sub->get_all;
# Changing the list of channels we subscribe to
$sub->subscribe('some-other-channel');
$sub->unsubscribe('some-other-channel');
# Changing the list of channels we publish to
$pub->publish('some-other-channel');
$pub->unpublish('some-other-channel');
# Listing and checking if we are in a channel
my @sub_channels = $sub->channels;
my @pub_channels = $pub->channels;
print "Sub is in #moose" if $sub->channels->{'#moose'};
print "Pub is in #moose" if $pub->channels->{'#moose'};
# Raw cache manipulation APIs (not advised; use ->modify instead)
$bus->lock('channel');
$bus->unlock('channel');
my @timed_msgs = $bus->fetch('key1', 'key2', 'key3');
$bus->store('key', 'value', time, 30);
# Atomic updating of cache content; $_ is stored back on the
# end of the callback.
my $rv = $bus->modify('key' => sub { delete $_->{foo} });
# Shorthand for $bus->modify('key' => sub { $_ = 'val' });
$bus->modify('key' => 'val');
# Shorthand for $bus->modify('key' => sub { $_ });
$bus->modify('key');
# Disconnect the backend connection explicitly
$bus->disconnect;
This module provides a simple \s-1API\s0 for publishing messages to channels and for subscribing to them.
When a message is published on a channel, all subscribers currently in that channel will get it on their next \*(C`get\*(C' or \*(C`get_all\*(C' call.
Currently, it offers four backends: \*(C`DBM_Deep\*(C' for on-disk storage, \*(C`Memcached\*(C' for possibly multi-host storage, \*(C`Jifty::DBI\*(C' for database-backed storage, and \*(C`PlainHash\*(C' for single-process storage.
Please see the tests in t/ for this distribution, as well as \*(L"\s-1SYNOPSIS\s0\*(R" above, for some usage examples; detailed documentation is not yet available.
IPC::DirQueue, where the subscribers divide the published messages among themselves, so different subscribers never see the same message.
Audrey Tang <[email protected]>
Copyright 2006, 2007 by Audrey Tang <[email protected]>.
This software is released under the \s-1MIT\s0 license cited below. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \*(L"Software\*(R"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
\s-1THE\s0 \s-1SOFTWARE\s0 \s-1IS\s0 \s-1PROVIDED\s0 \*(L"\s-1AS\s0 \s-1IS\s0\*(R", \s-1WITHOUT\s0 \s-1WARRANTY\s0 \s-1OF\s0 \s-1ANY\s0 \s-1KIND\s0, \s-1EXPRESS\s0 \s-1OR\s0 \s-1IMPLIED\s0, \s-1INCLUDING\s0 \s-1BUT\s0 \s-1NOT\s0 \s-1LIMITED\s0 \s-1TO\s0 \s-1THE\s0 \s-1WARRANTIES\s0 \s-1OF\s0 \s-1MERCHANTABILITY\s0, \s-1FITNESS\s0 \s-1FOR\s0 A \s-1PARTICULAR\s0 \s-1PURPOSE\s0 \s-1AND\s0 \s-1NONINFRINGEMENT\s0. \s-1IN\s0 \s-1NO\s0 \s-1EVENT\s0 \s-1SHALL\s0 \s-1THE\s0 \s-1AUTHORS\s0 \s-1OR\s0 \s-1COPYRIGHT\s0 \s-1HOLDERS\s0 \s-1BE\s0 \s-1LIABLE\s0 \s-1FOR\s0 \s-1ANY\s0 \s-1CLAIM\s0, \s-1DAMAGES\s0 \s-1OR\s0 \s-1OTHER\s0 \s-1LIABILITY\s0, \s-1WHETHER\s0 \s-1IN\s0 \s-1AN\s0 \s-1ACTION\s0 \s-1OF\s0 \s-1CONTRACT\s0, \s-1TORT\s0 \s-1OR\s0 \s-1OTHERWISE\s0, \s-1ARISING\s0 \s-1FROM\s0, \s-1OUT\s0 \s-1OF\s0 \s-1OR\s0 \s-1IN\s0 \s-1CONNECTION\s0 \s-1WITH\s0 \s-1THE\s0 \s-1SOFTWARE\s0 \s-1OR\s0 \s-1THE\s0 \s-1USE\s0 \s-1OR\s0 \s-1OTHER\s0 \s-1DEALINGS\s0 \s-1IN\s0 \s-1THE\s0 \s-1SOFTWARE\s0.