You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
554 lines
15 KiB
554 lines
15 KiB
/*
|
|
|
|
Copyright (C) 2000,2001 Stefan Westerfeld
|
|
stefan@space.twc.de
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Library General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 2 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Library General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Library General Public License
|
|
along with this library; see the file COPYING.LIB. If not, write to
|
|
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
|
|
Boston, MA 02110-1301, USA.
|
|
|
|
*/
|
|
|
|
#include <iostream>
|
|
#include "asyncschedule.h"
|
|
|
|
using namespace std;
|
|
using namespace Arts;
|
|
|
|
#include "debug.h"
|
|
#include <stdio.h>
|
|
|
|
/* Since this file is a tad bit more complex, here is some basic documentation:
|
|
|
|
1) ASyncPort: There are asynchronous ports which are parts of the standard-
|
|
flowsystem schedule nodes. Their lifetime starts whenever an asynchronous
|
|
stream gets created by the flow system, and ends when the schedule node
|
|
gets destroyed. Basically, an ASyncPort has two functions:
|
|
|
|
* it is a "Port", which means that it gets connect(), disconnect() and
|
|
other calls from the flowsystem
|
|
|
|
* it is a "GenericDataChannel", which means that DataPackets can interact
|
|
with it
|
|
|
|
Although there will be ASyncPorts which only send data and ASyncPorts which
|
|
only receive data (there are none that do both), there are no distinct
|
|
classes for this.
|
|
|
|
2) Standard case: a DataPacket that gets transported over a datachannel locally:
|
|
|
|
1. the user allocates himself a datapacket "packet"
|
|
2. the user calls "packet->send()", which in turn calls
|
|
ASyncPort::sendPacket(packet)
|
|
3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
|
|
useCount) over the NotificationManager
|
|
4. the NotificationManager delivers the DataPackets to the receiver
|
|
5. eventually, the receiver confirms using "packet->processed()"
|
|
6. the packet informs the ASyncPort::processedPacket()
|
|
7. the packet is freed
|
|
|
|
variant (pulling):
|
|
|
|
1. the user gets told by the ASyncPort: produce some data, here is a "packet"
|
|
2. the user calls "packet->send()", which in turn calls
|
|
ASyncPort::sendPacket(packet)
|
|
3. the ASyncPort sends the DataPacket to every subscriber (incrementing the
|
|
useCount) over the NotificationManager
|
|
4. the NotificationManager delivers the DataPackets to the receiver
|
|
5. eventually, the receiver confirms using "packet->processed()"
|
|
6. the packet informs the ASyncPort::processedPacket()
|
|
7. the ASyncPort restarts with 1.
|
|
|
|
3) Remote case: the remote case follows from the local case by adding two extra
|
|
things: one object that converts packets from their packet form to a message
|
|
(ASyncNetSend), and one object that converts packets from the message form
|
|
to a packet again. Effectively, the sending of a single packet looks like
|
|
this, then:
|
|
|
|
1-S. the user allocates himself a datapacket "packet"
|
|
2-S. the user calls "packet->send()", which in turn calls
|
|
ASyncPort::sendPacket(packet)
|
|
3-S. the ASyncPort sends the DataPacket to every subscriber (incrementing the
|
|
useCount) over the NotificationManager
|
|
4-S. the NotificationManager delivers the DataPackets to the ASyncNetSend
|
|
5-S. the ASyncNetSend::notify method gets called, which in turn converts
|
|
the packet to a network message
|
|
|
|
... network transfer ...
|
|
|
|
6-R. the ASyncNetReceive::receive method gets called - the method creates
|
|
a new data packet, and sends it using the NotificationManager again
|
|
7-R. the NotificationManager delivers the DataPacket to the receiver
|
|
8-R. eventually, the receiver confirms using "packet->processed()"
|
|
9-R. the packet informs the ASyncNetReceive::processedPacket() which
|
|
frees the packet and tells the (remote) sender that it went all right
|
|
|
|
... network transfer ...
|
|
|
|
10-S. eventually, ASyncNetSend::processed() gets called, and confirms
|
|
the packet using "packet->processed()"
|
|
11-S. the packet informs the ASyncPort::processedPacket()
|
|
12-S. the packet is freed
|
|
|
|
variant(pulling):
|
|
|
|
works the same as in the local case by exchanging steps 1-S and 12-S
|
|
|
|
4) ownership:
|
|
|
|
* ASyncPort: is owned by the Object which it is a part of, if the object
|
|
dies, ASyncPort dies unconditionally
|
|
|
|
* DataPacket: is owned by the GenericDataChannel they are propagated over,
|
|
that is, the ASyncPort normally - however if the DataPacket is still in
|
|
use (i.e. in state 5 of the local case), it will take responsibility to
|
|
free itself once all processed() calls have been collected
|
|
|
|
* ASyncNetSend, ASyncNetReceive: own each other, so that if the sender dies,
|
|
the connection will die as well, and if the receiver dies, the same happens
|
|
|
|
*/
|
|
|
|
#undef DEBUG_ASYNC_TRANSFER
|
|
|
|
ASyncPort::ASyncPort(const std::string& name, void *ptr, long flags,
|
|
StdScheduleNode* parent) : Port(name, ptr, flags, parent), pull(false)
|
|
{
|
|
stream = (GenericAsyncStream *)ptr;
|
|
stream->channel = this;
|
|
stream->_notifyID = notifyID = parent->object()->_mkNotifyID();
|
|
}
|
|
|
|
ASyncPort::~ASyncPort()
|
|
{
|
|
/*
|
|
* tell all outstanding packets that we don't exist any longer, so that
|
|
* if they feel like they need to confirm they have been processed now,
|
|
* they don't talk to an no longer existing object about it
|
|
*/
|
|
while(!sent.empty())
|
|
{
|
|
sent.front()->channel = 0;
|
|
sent.pop_front();
|
|
}
|
|
|
|
/* disconnect remote connections (if present): the following things will
|
|
* need to be ensured here, since we are being deleted:
|
|
*
|
|
* - the senders should not talk to us after our destructor
|
|
* - all of our connections need to be disconnected
|
|
* - every connection needs to be closed exactly once
|
|
*
|
|
* (closing a connection can cause reentrancy due to mcop communication)
|
|
*/
|
|
while(!netSenders.empty())
|
|
netSenders.front()->disconnect();
|
|
|
|
FlowSystemReceiver receiver = netReceiver;
|
|
if(!receiver.isNull())
|
|
receiver.disconnect();
|
|
}
|
|
|
|
//-------------------- GenericDataChannel interface -------------------------
|
|
|
|
void ASyncPort::setPull(int packets, int capacity)
|
|
{
|
|
pullNotification.receiver = parent->object();
|
|
pullNotification.ID = notifyID;
|
|
pullNotification.internal = 0;
|
|
pull = true;
|
|
|
|
for(int i=0;i<packets;i++)
|
|
{
|
|
GenericDataPacket *packet = stream->createPacket(capacity);
|
|
packet->useCount = 0;
|
|
pullNotification.data = packet;
|
|
NotificationManager::the()->send(pullNotification);
|
|
}
|
|
}
|
|
|
|
void ASyncPort::endPull()
|
|
{
|
|
pull = false;
|
|
// TODO: maybe remove all pending pull packets here
|
|
}
|
|
|
|
void ASyncPort::processedPacket(GenericDataPacket *packet)
|
|
{
|
|
int count = 0;
|
|
list<GenericDataPacket *>::iterator i = sent.begin();
|
|
while(i != sent.end())
|
|
{
|
|
if(*i == packet)
|
|
{
|
|
count++;
|
|
i = sent.erase(i);
|
|
}
|
|
else i++;
|
|
}
|
|
assert(count == 1);
|
|
|
|
#ifdef DEBUG_ASYNC_TRANSFER
|
|
cout << "port::processedPacket" << endl;
|
|
#endif
|
|
assert(packet->useCount == 0);
|
|
if(pull)
|
|
{
|
|
pullNotification.data = packet;
|
|
NotificationManager::the()->send(pullNotification);
|
|
}
|
|
else
|
|
{
|
|
stream->freePacket(packet);
|
|
}
|
|
}
|
|
|
|
void ASyncPort::sendPacket(GenericDataPacket *packet)
|
|
{
|
|
bool sendOk = false;
|
|
|
|
#ifdef DEBUG_ASYNC_TRANSFER
|
|
cout << "port::sendPacket" << endl;
|
|
#endif
|
|
|
|
if(packet->size > 0)
|
|
{
|
|
vector<Notification>::iterator i;
|
|
for(i=subscribers.begin(); i != subscribers.end(); i++)
|
|
{
|
|
Notification n = *i;
|
|
n.data = packet;
|
|
packet->useCount++;
|
|
#ifdef DEBUG_ASYNC_TRANSFER
|
|
cout << "sending notification " << n.ID << endl;
|
|
#endif
|
|
NotificationManager::the()->send(n);
|
|
sendOk = true;
|
|
}
|
|
}
|
|
|
|
if(sendOk)
|
|
sent.push_back(packet);
|
|
else
|
|
stream->freePacket(packet);
|
|
}
|
|
|
|
//----------------------- Port interface ------------------------------------
|
|
|
|
void ASyncPort::connect(Port *xsource)
|
|
{
|
|
arts_debug("port(%s)::connect",_name.c_str());
|
|
|
|
ASyncPort *source = xsource->asyncPort();
|
|
assert(source);
|
|
addAutoDisconnect(xsource);
|
|
|
|
Notification n;
|
|
n.receiver = parent->object();
|
|
n.ID = notifyID;
|
|
n.internal = 0;
|
|
source->subscribers.push_back(n);
|
|
}
|
|
|
|
void ASyncPort::disconnect(Port *xsource)
|
|
{
|
|
arts_debug("port::disconnect");
|
|
|
|
ASyncPort *source = xsource->asyncPort();
|
|
assert(source);
|
|
removeAutoDisconnect(xsource);
|
|
|
|
// remove our subscription from the source object
|
|
vector<Notification>::iterator si;
|
|
for(si = source->subscribers.begin(); si != source->subscribers.end(); si++)
|
|
{
|
|
if(si->receiver == parent->object())
|
|
{
|
|
source->subscribers.erase(si);
|
|
return;
|
|
}
|
|
}
|
|
|
|
// there should have been exactly one, so this shouldn't be reached
|
|
assert(false);
|
|
}
|
|
|
|
ASyncPort *ASyncPort::asyncPort()
|
|
{
|
|
return this;
|
|
}
|
|
|
|
GenericAsyncStream *ASyncPort::receiveNetCreateStream()
|
|
{
|
|
return stream->createNewStream();
|
|
}
|
|
|
|
NotificationClient *ASyncPort::receiveNetObject()
|
|
{
|
|
return parent->object();
|
|
}
|
|
|
|
long ASyncPort::receiveNetNotifyID()
|
|
{
|
|
return notifyID;
|
|
}
|
|
|
|
// Network transparency
|
|
void ASyncPort::addSendNet(ASyncNetSend *netsend)
|
|
{
|
|
Notification n;
|
|
n.receiver = netsend;
|
|
n.ID = netsend->notifyID();
|
|
n.internal = 0;
|
|
subscribers.push_back(n);
|
|
netSenders.push_back(netsend);
|
|
}
|
|
|
|
void ASyncPort::removeSendNet(ASyncNetSend *netsend)
|
|
{
|
|
arts_return_if_fail(netsend != 0);
|
|
netSenders.remove(netsend);
|
|
|
|
vector<Notification>::iterator si;
|
|
for(si = subscribers.begin(); si != subscribers.end(); si++)
|
|
{
|
|
if(si->receiver == netsend)
|
|
{
|
|
subscribers.erase(si);
|
|
return;
|
|
}
|
|
}
|
|
arts_warning("Failed to remove ASyncNetSend (%p) from ASyncPort", netsend);
|
|
}
|
|
|
|
void ASyncPort::setNetReceiver(ASyncNetReceive *receiver)
|
|
{
|
|
arts_return_if_fail(receiver != 0);
|
|
|
|
FlowSystemReceiver r = FlowSystemReceiver::_from_base(receiver->_copy());
|
|
netReceiver = r;
|
|
}
|
|
|
|
void ASyncPort::disconnectRemote(const string& dest)
|
|
{
|
|
list<ASyncNetSend *>::iterator i;
|
|
|
|
for(i = netSenders.begin(); i != netSenders.end(); i++)
|
|
{
|
|
if((*i)->dest() == dest)
|
|
{
|
|
(*i)->disconnect();
|
|
return;
|
|
}
|
|
}
|
|
arts_warning("failed to disconnect %s in ASyncPort", dest.c_str());
|
|
}
|
|
|
|
ASyncNetSend::ASyncNetSend(ASyncPort *ap, const std::string& dest) : ap(ap)
|
|
{
|
|
_dest = dest;
|
|
ap->addSendNet(this);
|
|
}
|
|
|
|
ASyncNetSend::~ASyncNetSend()
|
|
{
|
|
while(!pqueue.empty())
|
|
{
|
|
pqueue.front()->processed();
|
|
pqueue.pop();
|
|
}
|
|
if(ap)
|
|
{
|
|
ap->removeSendNet(this);
|
|
ap = 0;
|
|
}
|
|
}
|
|
|
|
long ASyncNetSend::notifyID()
|
|
{
|
|
return 1;
|
|
}
|
|
|
|
void ASyncNetSend::notify(const Notification& notification)
|
|
{
|
|
// got a packet?
|
|
assert(notification.ID == notifyID());
|
|
GenericDataPacket *dp = (GenericDataPacket *)notification.data;
|
|
pqueue.push(dp);
|
|
|
|
/*
|
|
* since packets are delivered asynchronously, and since disconnection
|
|
* involves communication, it might happen that we get a packet without
|
|
* actually being connected any longer - in that case, silently forget it
|
|
*/
|
|
if(!receiver.isNull())
|
|
{
|
|
// put it into a custom data message and send it to the receiver
|
|
Buffer *buffer = receiver._allocCustomMessage(receiveHandlerID);
|
|
dp->write(*buffer);
|
|
receiver._sendCustomMessage(buffer);
|
|
}
|
|
}
|
|
|
|
void ASyncNetSend::processed()
|
|
{
|
|
assert(!pqueue.empty());
|
|
pqueue.front()->processed();
|
|
pqueue.pop();
|
|
}
|
|
|
|
void ASyncNetSend::setReceiver(FlowSystemReceiver newReceiver)
|
|
{
|
|
receiver = newReceiver;
|
|
receiveHandlerID = newReceiver.receiveHandlerID();
|
|
}
|
|
|
|
void ASyncNetSend::disconnect()
|
|
{
|
|
/* since disconnection will cause destruction (most likely immediate),
|
|
* we'll reference ourselves ... */
|
|
_copy();
|
|
|
|
if(!receiver.isNull())
|
|
{
|
|
FlowSystemReceiver r = receiver;
|
|
receiver = FlowSystemReceiver::null();
|
|
r.disconnect();
|
|
}
|
|
if(ap)
|
|
{
|
|
ap->removeSendNet(this);
|
|
ap = 0;
|
|
}
|
|
|
|
_release();
|
|
}
|
|
|
|
string ASyncNetSend::dest()
|
|
{
|
|
return _dest;
|
|
}
|
|
|
|
/* dispatching function for custom message */
|
|
|
|
static void _dispatch_ASyncNetReceive_receive(void *object, Buffer *buffer)
|
|
{
|
|
((ASyncNetReceive *)object)->receive(buffer);
|
|
}
|
|
|
|
ASyncNetReceive::ASyncNetReceive(ASyncPort *port, FlowSystemSender sender)
|
|
{
|
|
port->setNetReceiver(this);
|
|
stream = port->receiveNetCreateStream();
|
|
stream->channel = this;
|
|
this->sender = sender;
|
|
/* stream->_notifyID = _mkNotifyID(); */
|
|
|
|
gotPacketNotification.ID = port->receiveNetNotifyID();
|
|
gotPacketNotification.receiver = port->receiveNetObject();
|
|
gotPacketNotification.internal = 0;
|
|
_receiveHandlerID =
|
|
_addCustomMessageHandler(_dispatch_ASyncNetReceive_receive,this);
|
|
}
|
|
|
|
ASyncNetReceive::~ASyncNetReceive()
|
|
{
|
|
/* tell outstanding packets that we don't exist any longer */
|
|
while(!sent.empty())
|
|
{
|
|
sent.front()->channel = 0;
|
|
sent.pop_front();
|
|
}
|
|
delete stream;
|
|
}
|
|
|
|
long ASyncNetReceive::receiveHandlerID()
|
|
{
|
|
return _receiveHandlerID;
|
|
}
|
|
|
|
void ASyncNetReceive::receive(Buffer *buffer)
|
|
{
|
|
GenericDataPacket *dp = stream->createPacket(512);
|
|
dp->read(*buffer);
|
|
dp->useCount = 1;
|
|
gotPacketNotification.data = dp;
|
|
NotificationManager::the()->send(gotPacketNotification);
|
|
sent.push_back(dp);
|
|
}
|
|
|
|
/*
|
|
* It will happen that this routine is called in time critical situations,
|
|
* such as: while audio calculation is running, and must be finished in
|
|
* time. The routine is mostly harmless, because sender->processed() is
|
|
* a oneway function, which just queues the buffer for sending and returns
|
|
* back, so it should return at once.
|
|
*
|
|
* However there is an exception upon first call: when sender->processed()
|
|
* is called for the first time, the method processed has still to be looked
|
|
* up. Thus, a synchronous call to _lookupMethod is made. That means, upon
|
|
* first call, the method will send out an MCOP request and block until the
|
|
* remote process tells that id.
|
|
*/
|
|
void ASyncNetReceive::processedPacket(GenericDataPacket *packet)
|
|
{
|
|
/*
|
|
* HACK! Upon disconnect, strange things will happen. One of them is
|
|
* that we might, for the reason of not being referenced any longer,
|
|
* cease to exist without warning. Another is that our nice "sender"
|
|
* reference will get a null reference without warning, see disconnect
|
|
* code (which will cause the attached stub to also disappear). As
|
|
* those objects (especially the stub) are not prepared for not
|
|
* being there any more in the middle of whatever they do, we here
|
|
* explicitly reference us, and them, *again*, so that no evil things
|
|
* will happen. A general solution for this would be garbage collection
|
|
* in a timer, but until this is implemented (if it ever will become
|
|
* implemented), we'll live with this hack.
|
|
*/
|
|
_copy();
|
|
sent.remove(packet);
|
|
stream->freePacket(packet);
|
|
if(!sender.isNull())
|
|
{
|
|
FlowSystemSender xsender = sender;
|
|
xsender.processed();
|
|
}
|
|
_release();
|
|
}
|
|
|
|
void ASyncNetReceive::disconnect()
|
|
{
|
|
if(!sender.isNull())
|
|
{
|
|
FlowSystemSender s = sender;
|
|
sender = FlowSystemSender::null();
|
|
s.disconnect();
|
|
}
|
|
}
|
|
|
|
void ASyncNetReceive::sendPacket(GenericDataPacket *)
|
|
{
|
|
assert(false);
|
|
}
|
|
|
|
void ASyncNetReceive::setPull(int, int)
|
|
{
|
|
assert(false);
|
|
}
|
|
|
|
void ASyncNetReceive::endPull()
|
|
{
|
|
assert(false);
|
|
}
|