OmniEvents
ProxyPullConsumer.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPullConsumer.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003-2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPullConsumer.h"
25 #include "Orb.h"
26 #include "omniEventsLog.h"
27 #include "PersistNode.h"
28 #include <assert.h>
29 
30 namespace OmniEvents {
31 
32 //
33 // ProxyPullConsumerManager
34 //
35 
36 PortableServer::Servant
38  const PortableServer::ObjectId& oid,
39  PortableServer::POA_ptr poa
40 )
41 {
42  DB(20,"ProxyPullConsumerManager::incarnate()")
44  _servants.insert(result);
45  return result;
46 }
47 
49  PortableServer::POA_ptr parentPoa,
50  list<CORBA::Any*>& q
51 )
52 : ProxyManager(parentPoa),
53  _queue(q)
54 {
55  ProxyManager::activate("ProxyPullConsumer");
56 }
57 
59 {
60  DB(20,"~ProxyPullConsumerManager()")
61 }
62 
64 
65 CosEventChannelAdmin::ProxyPullConsumer_ptr
67 {
68  return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
69  _managedPoa.in(),
70  CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
71  );
72 }
73 
75 {
76  // Collect events from each servant in turn.
77  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
78  {
79  ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
80  proxy->collect();
81  }
82 }
83 
85 {
86  // Trigger each servant in turn.
87  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
88  {
89  ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
90  proxy->triggerRequest();
91  }
92 }
93 
95 {
96  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
97  {
98  Proxy* p =*i; // Sun's CC requires this temporary.
99  ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
100  // We are in the EventChannel's thread.
101  // Make sure all calls go though the ProxyPullConsumer POA.
102  CosEventChannelAdmin::ProxyPullConsumer_var ppcv =ppc->_this();
103  ppcv->disconnect_pull_consumer();
104  }
105 }
106 
107 
108 //
109 // ProxyPullConsumer_i
110 //
111 
112 // CORBA interface methods
113 
115  CosEventComm::PullSupplier_ptr pullSupplier
116 )
117 {
118  if(CORBA::is_nil(pullSupplier))
119  throw CORBA::BAD_PARAM();
120  if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
121  throw CosEventChannelAdmin::AlreadyConnected();
122  _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
123 
125  {
126  WriteLock log;
127  output(log.os);
128  }
129 }
130 
132 {
133  DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
134  eraseKey("SupplierAdmin/ProxyPullConsumer");
136  if(CORBA::is_nil(_target))
137  {
138  throw CORBA::OBJECT_NOT_EXIST(
139  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
140  CORBA::COMPLETED_NO
141  );
142  }
143  else
144  {
145  CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
146  _target=CosEventComm::PullSupplier::_nil();
147  req->send_deferred();
148  Orb::inst().deferredRequest(req._retn());
149  }
150 }
151 
152 //
153 
155  PortableServer::POA_ptr poa,
156  list<CORBA::Any*>& q
157 )
158 : Proxy(poa),
159  _target(CosEventComm::PullSupplier::_nil()),
160  _queue(q),
161  _mode(Pull), // Prefer 'pull' method calls.
162  _exceptionCount(0)
163 {}
164 
166 {
167  DB(20,"~ProxyPullConsumer_i()")
168 }
169 
171 {
172  if(!CORBA::is_nil(_req) && _req->poll_response())
173  {
174  const char* opname =_req->operation();
175  assert(opname);
176  CORBA::Environment_ptr env =_req->env(); // No need to release environment.
177 
178  if(!CORBA::is_nil(env) && env->exception())
179  {
180  CORBA::Exception* ex =env->exception(); // No need to free exception.
181  DB(10,"ProxyPullConsumer got exception"
182  IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
183  if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
184  {
185  ++_exceptionCount;
186  _mode=( _mode==Pull? TryPull: Pull ); // Try something else next time.
187  }
188  else
189  DB(2,"Ignoring unrecognised response. operation:"<<opname);
190  if(_exceptionCount>=4)
191  {
193 
194  // Try to notify the Supplier that the connection is closing.
195  CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
196  req->send_deferred();
197  Orb::inst().deferredRequest(req._retn());
198 
199  _target=CosEventComm::PullSupplier::_nil(); // disconnected
200  eraseKey("SupplierAdmin/ProxyPullConsumer");
202  }
203  }
204  else
205  {
206  // Do we have an event?
207  bool hasEvent=false;
208  if(0==strcmp("pull",opname))
209  {
210  hasEvent=true;
211  }
212  else if(0==strcmp("try_pull",opname))
213  {
214  CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
215  if(args->count()==1)
216  {
217  CORBA::NamedValue_var hasEventArg=args->item(0);
218  if(0==strcmp(hasEventArg->name(),"has_event"))
219  {
220  CORBA::Any* a =hasEventArg->value();
221  CORBA::Boolean b;
222  CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
223  hasEvent=(((*a)>>=tb) && b);
224  }
225  }
226  }
227  // Pick up an event, if we have one.
228  if(hasEvent)
229  {
230  CORBA::Any* event =new CORBA::Any();
231  _req->return_value() >>= (*event);
232  _queue.push_back(event);
233  }
234  // Reset the exception count.
235  _exceptionCount=0;
236  }
237  _req=CORBA::Request::_nil();
238  }
239 } // ProxyPullConsumer_i::end collect()
240 
242 {
243  if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
244  {
245  switch(_mode)
246  {
247  case Pull:
248  _req=_target->_request("pull");
249  break;
250  case TryPull:
251  _req=_target->_request("try_pull");
252  _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
253  break;
254  default:
255  assert(0);
256  }
257  _req->set_return_type(CORBA::_tc_any);
258  _req->send_deferred();
259  }
260 }
261 
263  const string& oid,
264  const PersistNode& node
265 )
266 {
267  CosEventComm::PullSupplier_var pullSupplier =
268  string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
269  // Do not activate until we know that we have read a valid target.
270  activateObjectWithId(oid.c_str());
271  connect_pull_supplier(pullSupplier.in());
272 }
273 
275 {
276  basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
277 }
278 
279 }; // end namespace OmniEvents
Implementation of the ProxyPullConsumer interface.
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
Base class for three of the four Proxy servants.
Definition: ProxyManager.h:104
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Definition: ProxyManager.h:95
void collect()
Collects events that have arrived at connected proxies.
void triggerRequest()
When _req is NIL, sends out a new pull() or try_pull() call.
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
void activate(const char *name)
Creates the Proxy-type&#39;s POA, and registers this object as its ServantManager.
Base class for ServantActivator classes that manage Proxy servants.
Definition: ProxyManager.h:57
#define IF_OMNIORB4(omniORB4_code)
Definition: Orb.h:46
int _exceptionCount
Only when two consecutive exceptions have been received from each mode, do we consider the connection...
void disconnect()
Send disconnect_pull_supplier() to all connected PullSuppliers.
#define DB(l, x)
Definition: Orb.h:49
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition: Servant.cc:160
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
void output(ostream &os)
Save this object&#39;s state to a stream.
static bool exists()
Library code may create Event Service objects without the need for persistency.
void connect_pull_supplier(CosEventComm::PullSupplier_ptr pullSupplier)
CosEventComm::PullSupplier_var _target
void eraseKey(const char *name)
Helper method for constructing persistency output.
void output(ostream &os)
Save this object&#39;s state to a stream.
Definition: ProxyManager.cc:87
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
set< Proxy * > _servants
The set of all active Proxies in this object&#39;s _managedPoa.
Definition: ProxyManager.h:90
ProxyPullConsumer_i(PortableServer::POA_ptr poa, list< CORBA::Any * > &q)
OMNIEVENTS__DEBUG_REF_COUNTS__DECL CosEventChannelAdmin::ProxyPullConsumer_ptr createObject()
static Orb & inst()
Definition: Orb.h:81
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access...
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
ProxyPullConsumerManager(PortableServer::POA_ptr parentPoa, list< CORBA::Any * > &q)
void triggerRequest()
For each connected proxy, if there is no request in progress, send a new request to the current opera...
CORBA::Request_var _req
Definition: ProxyManager.h:128
#define HERE
Generates a string literal that describes the filename and line number.
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
Definition: Orb.cc:204
void collect()
Collects responses since the last trigger.