A Session
represents a distinct conversation between end
points. They are created from an active (i.e., not closed) Connection.
A Session
is used to acknowledge individual or all messages
that have passed through it
Acknowledges one or more outstanding messages that have been received on this session.
options
- the set of options
:message - if specified, then only that Message is acknowledged
:sync - if true, the call will block until processed by the broker
# acknowledge all received messages session.acknowledge # acknowledge a single message session.acknowledge :message => message # acknowledge all messages, wait until the call finishes session.acknowledge :sync => true
# File lib/qpid_messaging/session.rb, line 148 def acknowledge(options = {}) sync = options[:sync] || false message = options[:message] if options[:message] unless message.nil? @session_impl.acknowledge message.message_impl, sync else @session_impl.acknowledge sync end end
Closes the Session
and all associated Sender
and
Receiver
instances.
NOTE: All Session
instances for a Connection are closed when the Connection is closed. But closing a
Session
does not affect the owning Connection.
# File lib/qpid_messaging/session.rb, line 114 def close; @session_impl.close; end
Commits any pending transactions for a transactional session.
# File lib/qpid_messaging/session.rb, line 117 def commit; @session_impl.commit; end
Returns the Connection associated with this session.
# File lib/qpid_messaging/session.rb, line 41 def connection @connection end
Creates a new endpoint for receiving messages.
The address
can either be an instance Address or else an address string.
address
- the end point address.
# File lib/qpid_messaging/session.rb, line 85 def create_receiver(address) result = nil receiver_impl = nil if address.class == Qpid::Messaging::Address address_impl = address.address_impl receiver_impl = @session_impl.createReceiver address_impl else receiver_impl = @session_impl.createReceiver(address) end Qpid::Messaging::Receiver.new self, receiver_impl end
Creates a new endpoint for sending messages.
The address can either be an instance Address or else an address string.
address
- the end point address.
# File lib/qpid_messaging/session.rb, line 53 def create_sender(address) _address = address if address.class == Qpid::Messaging::Address _address = address.address_impl end sender_impl = @session_impl.createSender(_address) sender_name = sender_impl.getName Qpid::Messaging::Sender.new(self, sender_impl) end
If the Session
has been rendered invalid due to some
exception, this method will result in that exception being raised.
If none have occurred, then no exceptions are raised.
# show any errors that occurred during the Session if @session.errors? begin @session.errors rescue Exception => error puts "An error occurred: #{error}" end end
# File lib/qpid_messaging/session.rb, line 257 def errors; @session_impl.checkError; end
Returns true if there were exceptions on this session.
# File lib/qpid_messaging/session.rb, line 240 def errors?; @session_impl.hasError; end
Fetches the next Receiver with a message pending. Waits the specified number of milliseconds before timing out.
For a Receiver to be returned, it must have a capacity > 0 and have Messages locally queued.
If no Receiver is found within the time out period, then a MessageError is raised.
timeout
- the duration
loop do begin # wait a maximum of one minute for the next receiver to be ready recv = session.next_receiver Qpid::Messaging::Duration::MINUTE # get and dispatch the message msg = recv.get dispatch_message msg rescue puts "No receivers were returned" end end
# File lib/qpid_messaging/session.rb, line 228 def next_receiver(timeout = Qpid::Messaging::Duration::FOREVER, &block) receiver_impl = @session_impl.nextReceiver(timeout.duration_impl) unless receiver_impl.nil? recv = Qpid::Messaging::Receiver.new self, receiver_impl block.call recv unless block.nil? end return recv end
Returns the total number of receivable messages, and messages already
received, by Receiver instances associated with
this Session
.
# File lib/qpid_messaging/session.rb, line 191 def receivable; @session_impl.getReceivable; end
Rejects the specified message. A rejected message will not be redelivered.
NOTE: A message cannot be rejected once it has been acknowledged.
# File lib/qpid_messaging/session.rb, line 163 def reject(message); @session_impl.reject message.message_impl; end
Releases the message, which allows the broker to attempt to redeliver it.
NOTE: A message connot be released once it has been acknowled.
# File lib/qpid_messaging/session.rb, line 169 def release(message); @session_impl.release message.message_impl; end
Rolls back any uncommitted transactions on a transactional session.
# File lib/qpid_messaging/session.rb, line 120 def rollback; @session_impl.rollback; end
Requests synchronization with the broker.
options
- the list of options
:block
- if true, the call blocks until the broker
acknowledges it
# File lib/qpid_messaging/session.rb, line 184 def sync(args = {}) block = args[:block] || false @session_impl.sync block end
Returns the number of messages that have been acknowledged by this
Session
whose acknowledgements have not been confirmed as
processed by the broker.
# File lib/qpid_messaging/session.rb, line 196 def unsettled_acks; @session_impl.getUnsettledAcks; end