Interprocess publish/subscribe channels
# A new message bus with the DBM::Deep backend # (Other possible backends include Memcached and PlainHash) my $bus = IPC::PubSub->new(DBM_Deep => '/tmp/pubsub.db'); # A channel is any arbitrary string my $channel = '#perl6'; # Register a new publisher (you can publish to multiple channels) my $pub = $bus->new_publisher("#perl6", "#moose"); # Publish a message (may be a complex object) to those channels $pub->msg("This is a message"); # Register a new subscriber (you can subscribe to multiple channels) my $sub = $bus->new_subscriber("#moose"); # Publish an object to channels $pub->msg("This is another message"); # Set all subsequent messages from this publisher to expire in 30 seconds $pub->expiry(30); $pub->msg("This message will go away in 30 seconds"); # Simple get: Returns the messages sent since the previous get, # but only for the first channel. my @msgs = $sub->get; # Simple get, with an explicit channel key (must be among the ones # it initially subscribed to) my @moose_msgs = $sub->get("#moose"); # Complex get: Returns a hash reference from channels to array # references of [timestamp, message]. my $hash_ref = $sub->get_all; # Changing the list of channels we subscribe to $sub->subscribe('some-other-channel'); $sub->unsubscribe('some-other-channel'); # Changing the list of channels we publish to $pub->publish('some-other-channel'); $pub->unpublish('some-other-channel'); # Listing and checking if we are in a channel my @sub_channels = $sub->channels; my @pub_channels = $pub->channels; print "Sub is in #moose" if $sub->channels->{'#moose'}; print "Pub is in #moose" if $pub->channels->{'#moose'}; # Raw cache manipulation APIs (not advised; use ->modify instead) $bus->lock('channel'); $bus->unlock('channel'); my @timed_msgs = $bus->fetch('key1', 'key2', 'key3'); $bus->store('key', 'value', time, 30); # Atomic updating of cache content; $_ is stored back on the # end of the callback. my $rv = $bus->modify('key' => sub { delete $_->{foo} }); # Shorthand for $bus->modify('key' => sub { $_ = 'val' }); $bus->modify('key' => 'val'); # Shorthand for $bus->modify('key' => sub { $_ }); $bus->modify('key'); # Disconnect the backend connection explicitly $bus->disconnect;
This module provides a simple \s-1API\s0 for publishing messages to channels and for subscribing to them.
When a message is published on a channel, all subscribers currently in that channel will get it on their next \*(C`get\*(C' or \*(C`get_all\*(C' call.
Currently, it offers four backends: \*(C`DBM_Deep\*(C' for on-disk storage, \*(C`Memcached\*(C' for possibly multi-host storage, \*(C`Jifty::DBI\*(C' for database-backed storage, and \*(C`PlainHash\*(C' for single-process storage.
Please see the tests in t/ for this distribution, as well as \*(L"\s-1SYNOPSIS\s0\*(R" above, for some usage examples; detailed documentation is not yet available.
IPC::DirQueue, where the subscribers divide the published messages among themselves, so different subscribers never see the same message.
Audrey Tang <[email protected]>
Copyright 2006, 2007 by Audrey Tang <[email protected]>.
This software is released under the \s-1MIT\s0 license cited below. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \*(L"Software\*(R"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
\s-1THE\s0 \s-1SOFTWARE\s0 \s-1IS\s0 \s-1PROVIDED\s0 \*(L"\s-1AS\s0 \s-1IS\s0\*(R", \s-1WITHOUT\s0 \s-1WARRANTY\s0 \s-1OF\s0 \s-1ANY\s0 \s-1KIND\s0, \s-1EXPRESS\s0 \s-1OR\s0 \s-1IMPLIED\s0, \s-1INCLUDING\s0 \s-1BUT\s0 \s-1NOT\s0 \s-1LIMITED\s0 \s-1TO\s0 \s-1THE\s0 \s-1WARRANTIES\s0 \s-1OF\s0 \s-1MERCHANTABILITY\s0, \s-1FITNESS\s0 \s-1FOR\s0 A \s-1PARTICULAR\s0 \s-1PURPOSE\s0 \s-1AND\s0 \s-1NONINFRINGEMENT\s0. \s-1IN\s0 \s-1NO\s0 \s-1EVENT\s0 \s-1SHALL\s0 \s-1THE\s0 \s-1AUTHORS\s0 \s-1OR\s0 \s-1COPYRIGHT\s0 \s-1HOLDERS\s0 \s-1BE\s0 \s-1LIABLE\s0 \s-1FOR\s0 \s-1ANY\s0 \s-1CLAIM\s0, \s-1DAMAGES\s0 \s-1OR\s0 \s-1OTHER\s0 \s-1LIABILITY\s0, \s-1WHETHER\s0 \s-1IN\s0 \s-1AN\s0 \s-1ACTION\s0 \s-1OF\s0 \s-1CONTRACT\s0, \s-1TORT\s0 \s-1OR\s0 \s-1OTHERWISE\s0, \s-1ARISING\s0 \s-1FROM\s0, \s-1OUT\s0 \s-1OF\s0 \s-1OR\s0 \s-1IN\s0 \s-1CONNECTION\s0 \s-1WITH\s0 \s-1THE\s0 \s-1SOFTWARE\s0 \s-1OR\s0 \s-1THE\s0 \s-1USE\s0 \s-1OR\s0 \s-1OTHER\s0 \s-1DEALINGS\s0 \s-1IN\s0 \s-1THE\s0 \s-1SOFTWARE\s0.