| File: | blib/lib/OpenSRF/AppSession.pm |
| Coverage: | 7.7% |
| line | stmt | bran | cond | sub | pod | time | code |
|---|---|---|---|---|---|---|---|
| 1 | package OpenSRF::AppSession; | ||||||
| 2 | 9 9 9 | 85 35 168 | use OpenSRF::DomainObject::oilsMessage; | ||||
| 3 | 9 9 9 | 152 45 101 | use OpenSRF::DomainObject::oilsMethod; | ||||
| 4 | 9 9 9 | 73 35 67 | use OpenSRF::DomainObject::oilsResponse qw/:status/; | ||||
| 5 | 9 9 9 | 114 38 120 | use OpenSRF::Transport::PeerHandle; | ||||
| 6 | 9 9 9 | 72 30 65 | use OpenSRF::Utils::JSON; | ||||
| 7 | 9 9 9 | 63 32 57 | use OpenSRF::Utils::Logger qw(:level); | ||||
| 8 | 9 9 9 | 107 32 85 | use OpenSRF::Utils::SettingsClient; | ||||
| 9 | 9 9 9 | 66 33 59 | use OpenSRF::Utils::Config; | ||||
| 10 | 9 9 9 | 61 31 53 | use OpenSRF::EX; | ||||
| 11 | 9 9 9 | 64 31 71 | use OpenSRF; | ||||
| 12 | 9 9 9 | 60 35 57 | use Exporter; | ||||
| 13 | 9 9 9 | 61 32 79 | use base qw/Exporter OpenSRF/; | ||||
| 14 | 9 9 9 | 61 31 71 | use Time::HiRes qw( time usleep ); | ||||
| 15 | 9 9 9 | 65 32 79 | use warnings; | ||||
| 16 | 9 9 9 | 1182 47 68 | use strict; | ||||
| 17 | |||||||
| 18 | our @EXPORT_OK = qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNECTED CLIENT SERVER/; | ||||||
| 19 | our %EXPORT_TAGS = ( state => [ qw/CONNECTING INIT_CONNECTED CONNECTED DISCONNECTED/ ], | ||||||
| 20 | endpoint => [ qw/CLIENT SERVER/ ], | ||||||
| 21 | ); | ||||||
| 22 | |||||||
| 23 | my $logger = "OpenSRF::Utils::Logger"; | ||||||
| 24 | my $_last_locale = 'en-US'; | ||||||
| 25 | |||||||
| 26 | our %_CACHE; | ||||||
| 27 | our @_RESEND_QUEUE; | ||||||
| 28 | |||||||
| 29 | 0 | 0 | sub CONNECTING { return 3 }; | ||||
| 30 | 0 | 0 | sub INIT_CONNECTED { return 4 }; | ||||
| 31 | 0 | 0 | sub CONNECTED { return 1 }; | ||||
| 32 | 0 | 0 | sub DISCONNECTED { return 2 }; | ||||
| 33 | |||||||
| 34 | 0 | 0 | sub CLIENT { return 2 }; | ||||
| 35 | 0 | 0 | sub SERVER { return 1 }; | ||||
| 36 | |||||||
| 37 | sub find { | ||||||
| 38 | 0 | 0 | return undef unless (defined $_[1]); | ||||
| 39 | 0 | return $_CACHE{$_[1]} if (exists($_CACHE{$_[1]})); | |||||
| 40 | } | ||||||
| 41 | |||||||
| 42 | sub transport_connected { | ||||||
| 43 | 0 | 0 | my $self = shift; | ||||
| 44 | 0 | if( ! exists $self->{peer_handle} || ! $self->{peer_handle} ) { | |||||
| 45 | 0 | return 0; | |||||
| 46 | } | ||||||
| 47 | 0 | return $self->{peer_handle}->tcp_connected(); | |||||
| 48 | } | ||||||
| 49 | |||||||
| 50 | sub connected { | ||||||
| 51 | 0 | 0 | my $self = shift; | ||||
| 52 | 0 | return $self->state == CONNECTED; | |||||
| 53 | } | ||||||
| 54 | # ---------------------------------------------------------------------------- | ||||||
| 55 | # Clears the transport buffers | ||||||
| 56 | # call this if you are not through with the sesssion, but you want | ||||||
| 57 | # to have a clean slate. You shouldn't have to call this if | ||||||
| 58 | # you are correctly 'recv'ing all of the data from a request. | ||||||
| 59 | # however, if you don't want all of the data, this will | ||||||
| 60 | # slough off any excess | ||||||
| 61 | # * * Note: This will delete data for all sessions using this transport | ||||||
| 62 | # handle. For example, all client sessions use the same handle. | ||||||
| 63 | # ---------------------------------------------------------------------------- | ||||||
| 64 | sub buffer_reset { | ||||||
| 65 | |||||||
| 66 | 0 | 0 | my $self = shift; | ||||
| 67 | 0 | if( ! exists $self->{peer_handle} || ! $self->{peer_handle} ) { | |||||
| 68 | 0 | return 0; | |||||
| 69 | } | ||||||
| 70 | 0 | $self->{peer_handle}->buffer_reset(); | |||||
| 71 | } | ||||||
| 72 | |||||||
| 73 | |||||||
| 74 | # when any incoming data is received, this method is called. | ||||||
| 75 | sub server_build { | ||||||
| 76 | 0 | 0 | my $class = shift; | ||||
| 77 | 0 | $class = ref($class) || $class; | |||||
| 78 | |||||||
| 79 | 0 | my $sess_id = shift; | |||||
| 80 | 0 | my $remote_id = shift; | |||||
| 81 | 0 | my $service = shift; | |||||
| 82 | |||||||
| 83 | 0 | warn "Missing args to server_build():\n" . | |||||
| 84 | "sess_id: $sess_id, remote_id: $remote_id, service: $service\n" | ||||||
| 85 | unless ($sess_id and $remote_id and $service); | ||||||
| 86 | |||||||
| 87 | 0 | return undef unless ($sess_id and $remote_id and $service); | |||||
| 88 | |||||||
| 89 | 0 | if ( my $thingy = $class->find($sess_id) ) { | |||||
| 90 | 0 | $thingy->remote_id( $remote_id ); | |||||
| 91 | 0 | return $thingy; | |||||
| 92 | } | ||||||
| 93 | |||||||
| 94 | 0 | if( $service eq "client" ) { | |||||
| 95 | #throw OpenSRF::EX::PANIC ("Attempting to build a client session as a server" . | ||||||
| 96 | # " Session ID [$sess_id], remote_id [$remote_id]"); | ||||||
| 97 | |||||||
| 98 | 0 | warn "Attempting to build a client session as ". | |||||
| 99 | "a server Session ID [$sess_id], remote_id [$remote_id]"; | ||||||
| 100 | |||||||
| 101 | 0 | $logger->debug("Attempting to build a client session as ". | |||||
| 102 | "a server Session ID [$sess_id], remote_id [$remote_id]", ERROR ); | ||||||
| 103 | |||||||
| 104 | 0 | return undef; | |||||
| 105 | } | ||||||
| 106 | |||||||
| 107 | 0 | my $config_client = OpenSRF::Utils::SettingsClient->new(); | |||||
| 108 | 0 | my $stateless = $config_client->config_value("apps", $service, "stateless"); | |||||
| 109 | |||||||
| 110 | #my $max_requests = $conf->$service->max_requests; | ||||||
| 111 | 0 | my $max_requests = $config_client->config_value("apps",$service,"max_requests"); | |||||
| 112 | 0 | $logger->debug( "Max Requests for $service is $max_requests", INTERNAL ) if (defined $max_requests); | |||||
| 113 | |||||||
| 114 | 0 | $logger->transport( "AppSession creating new session: $sess_id", INTERNAL ); | |||||
| 115 | |||||||
| 116 | 0 | my $self = bless { recv_queue => [], | |||||
| 117 | request_queue => [], | ||||||
| 118 | requests => 0, | ||||||
| 119 | session_data => {}, | ||||||
| 120 | callbacks => {}, | ||||||
| 121 | endpoint => SERVER, | ||||||
| 122 | state => CONNECTING, | ||||||
| 123 | session_id => $sess_id, | ||||||
| 124 | remote_id => $remote_id, | ||||||
| 125 | peer_handle => OpenSRF::Transport::PeerHandle->retrieve($service), | ||||||
| 126 | max_requests => $max_requests, | ||||||
| 127 | session_threadTrace => 0, | ||||||
| 128 | service => $service, | ||||||
| 129 | stateless => $stateless, | ||||||
| 130 | } => $class; | ||||||
| 131 | |||||||
| 132 | 0 | return $_CACHE{$sess_id} = $self; | |||||
| 133 | } | ||||||
| 134 | |||||||
| 135 | sub session_data { | ||||||
| 136 | 0 | 0 | my $self = shift; | ||||
| 137 | 0 | my ($name, $datum) = @_; | |||||
| 138 | |||||||
| 139 | 0 | $self->{session_data}->{$name} = $datum if (defined $datum); | |||||
| 140 | 0 | return $self->{session_data}->{$name}; | |||||
| 141 | } | ||||||
| 142 | |||||||
| 143 | 0 | 0 | sub service { return shift()->{service}; } | ||||
| 144 | |||||||
| 145 | sub continue_request { | ||||||
| 146 | 0 | 0 | my $self = shift; | ||||
| 147 | 0 | $self->{'requests'}++; | |||||
| 148 | 0 | return 1 if (!$self->{'max_requests'}); | |||||
| 149 | 0 | return $self->{'requests'} <= $self->{'max_requests'} ? 1 : 0; | |||||
| 150 | } | ||||||
| 151 | |||||||
| 152 | sub last_sent_payload { | ||||||
| 153 | 0 | 0 | my( $self, $payload ) = @_; | ||||
| 154 | 0 | if( $payload ) { | |||||
| 155 | 0 | return $self->{'last_sent_payload'} = $payload; | |||||
| 156 | } | ||||||
| 157 | 0 | return $self->{'last_sent_payload'}; | |||||
| 158 | } | ||||||
| 159 | |||||||
| 160 | sub session_locale { | ||||||
| 161 | 0 | 0 | my( $self, $type ) = @_; | ||||
| 162 | 0 | if( $type ) { | |||||
| 163 | 0 | $_last_locale = $type if ($self->endpoint == SERVER); | |||||
| 164 | 0 | return $self->{'session_locale'} = $type; | |||||
| 165 | } | ||||||
| 166 | 0 | return $self->{'session_locale'}; | |||||
| 167 | } | ||||||
| 168 | |||||||
| 169 | sub last_sent_type { | ||||||
| 170 | 0 | 0 | my( $self, $type ) = @_; | ||||
| 171 | 0 | if( $type ) { | |||||
| 172 | 0 | return $self->{'last_sent_type'} = $type; | |||||
| 173 | } | ||||||
| 174 | 0 | return $self->{'last_sent_type'}; | |||||
| 175 | } | ||||||
| 176 | |||||||
| 177 | sub get_app_targets { | ||||||
| 178 | 0 | 0 | my $app = shift; | ||||
| 179 | |||||||
| 180 | 0 | my $conf = OpenSRF::Utils::Config->current; | |||||
| 181 | 0 | my $router_name = $conf->bootstrap->router_name || 'router'; | |||||
| 182 | 0 | my $domain = $conf->bootstrap->domain; | |||||
| 183 | 0 | $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains; | |||||
| 184 | |||||||
| 185 | 0 | unless($router_name and $domain) { | |||||
| 186 | 0 | throw OpenSRF::EX::Config | |||||
| 187 | ("Missing router config information 'router_name' and 'domain'"); | ||||||
| 188 | } | ||||||
| 189 | |||||||
| 190 | 0 | return ("$router_name\@$domain/$app"); | |||||
| 191 | } | ||||||
| 192 | |||||||
| 193 | sub stateless { | ||||||
| 194 | 0 | 0 | my $self = shift; | ||||
| 195 | 0 | my $state = shift; | |||||
| 196 | 0 | $self->{stateless} = $state if (defined $state); | |||||
| 197 | 0 | return $self->{stateless}; | |||||
| 198 | } | ||||||
| 199 | |||||||
| 200 | # When we're a client and we want to connect to a remote service | ||||||
| 201 | sub create { | ||||||
| 202 | 0 | 0 | my $class = shift; | ||||
| 203 | 0 | $class = ref($class) || $class; | |||||
| 204 | |||||||
| 205 | 0 | my $app = shift; | |||||
| 206 | 0 | my $api_level = shift; | |||||
| 207 | 0 | my $quiet = shift; | |||||
| 208 | 0 | my $locale = shift || $_last_locale; | |||||
| 209 | |||||||
| 210 | 0 | $api_level = 1 if (!defined($api_level)); | |||||
| 211 | |||||||
| 212 | 0 | $logger->debug( "AppSession creating new client session for $app", DEBUG ); | |||||
| 213 | |||||||
| 214 | 0 | my $stateless = 0; | |||||
| 215 | 0 | my $c = OpenSRF::Utils::SettingsClient->new(); | |||||
| 216 | # we can get an infinite loop if we're grabbing the settings and we | ||||||
| 217 | # need the settings to grab the settings... | ||||||
| 218 | 0 | if($app ne "opensrf.settings" || $c->has_config()) { | |||||
| 219 | 0 | $stateless = $c->config_value("apps", $app, "stateless"); | |||||
| 220 | } | ||||||
| 221 | |||||||
| 222 | 0 | my $sess_id = time . rand( $$ ); | |||||
| 223 | 0 | while ( $class->find($sess_id) ) { | |||||
| 224 | 0 | $sess_id = time . rand( $$ ); | |||||
| 225 | } | ||||||
| 226 | |||||||
| 227 | |||||||
| 228 | 0 | my ($r_id) = get_app_targets($app); | |||||
| 229 | |||||||
| 230 | 0 | my $peer_handle = OpenSRF::Transport::PeerHandle->retrieve("client"); | |||||
| 231 | 0 | if( ! $peer_handle ) { | |||||
| 232 | 0 | $peer_handle = OpenSRF::Transport::PeerHandle->retrieve("system_client"); | |||||
| 233 | } | ||||||
| 234 | |||||||
| 235 | 0 | my $self = bless { app_name => $app, | |||||
| 236 | request_queue => [], | ||||||
| 237 | endpoint => CLIENT, | ||||||
| 238 | state => DISCONNECTED,#since we're init'ing | ||||||
| 239 | session_id => $sess_id, | ||||||
| 240 | remote_id => $r_id, | ||||||
| 241 | raise_error => $quiet ? 0 : 1, | ||||||
| 242 | session_locale => $locale, | ||||||
| 243 | api_level => $api_level, | ||||||
| 244 | orig_remote_id => $r_id, | ||||||
| 245 | peer_handle => $peer_handle, | ||||||
| 246 | session_threadTrace => 0, | ||||||
| 247 | stateless => $stateless, | ||||||
| 248 | } => $class; | ||||||
| 249 | |||||||
| 250 | 0 | $logger->debug( "Created new client session $app : $sess_id" ); | |||||
| 251 | |||||||
| 252 | 0 | return $_CACHE{$sess_id} = $self; | |||||
| 253 | } | ||||||
| 254 | |||||||
| 255 | sub raise_remote_errors { | ||||||
| 256 | 0 | 0 | my $self = shift; | ||||
| 257 | 0 | my $err = shift; | |||||
| 258 | 0 | $self->{raise_error} = $err if (defined $err); | |||||
| 259 | 0 | return $self->{raise_error}; | |||||
| 260 | } | ||||||
| 261 | |||||||
| 262 | sub api_level { | ||||||
| 263 | 0 | 0 | return shift()->{api_level}; | ||||
| 264 | } | ||||||
| 265 | |||||||
| 266 | sub app { | ||||||
| 267 | 0 | 0 | return shift()->{app_name}; | ||||
| 268 | } | ||||||
| 269 | |||||||
| 270 | sub reset { | ||||||
| 271 | 0 | 0 | my $self = shift; | ||||
| 272 | 0 | $self->remote_id($$self{orig_remote_id}); | |||||
| 273 | } | ||||||
| 274 | |||||||
| 275 | # 'connect' can be used as a constructor if called as a class method, | ||||||
| 276 | # or used to connect a session that has disconnectd if called against | ||||||
| 277 | # an existing session that seems to be disconnected, or was just built | ||||||
| 278 | # using 'create' above. | ||||||
| 279 | |||||||
| 280 | # connect( $app, username => $user, secret => $passwd ); | ||||||
| 281 | # OR | ||||||
| 282 | # connect( $app, sysname => $user, secret => $shared_secret ); | ||||||
| 283 | |||||||
| 284 | # --- Returns undef if the connect attempt times out. | ||||||
| 285 | # --- Returns the OpenSRF::EX object if one is returned by the server | ||||||
| 286 | # --- Returns self if connected | ||||||
| 287 | sub connect { | ||||||
| 288 | 0 | 0 | my $self = shift; | ||||
| 289 | 0 | my $class = ref($self) || $self; | |||||
| 290 | |||||||
| 291 | |||||||
| 292 | 0 | if ( ref( $self ) and $self->state && $self->state == CONNECTED ) { | |||||
| 293 | 0 | $logger->transport("AppSession already connected", DEBUG ); | |||||
| 294 | } else { | ||||||
| 295 | 0 | $logger->transport("AppSession not connected, connecting..", DEBUG ); | |||||
| 296 | } | ||||||
| 297 | 0 | return $self if ( ref( $self ) and $self->state && $self->state == CONNECTED ); | |||||
| 298 | |||||||
| 299 | |||||||
| 300 | 0 | my $app = shift; | |||||
| 301 | 0 | my $api_level = shift; | |||||
| 302 | 0 | $api_level = 1 unless (defined $api_level); | |||||
| 303 | |||||||
| 304 | 0 | $self = $class->create($app, @_) if (!ref($self)); | |||||
| 305 | |||||||
| 306 | 0 | return undef unless ($self); | |||||
| 307 | |||||||
| 308 | 0 | $self->{api_level} = $api_level; | |||||
| 309 | |||||||
| 310 | 0 | $self->reset; | |||||
| 311 | 0 | $self->state(CONNECTING); | |||||
| 312 | 0 | $self->send('CONNECT', ""); | |||||
| 313 | |||||||
| 314 | |||||||
| 315 | # if we want to connect to settings, we may not have | ||||||
| 316 | # any data for the settings client to work with... | ||||||
| 317 | # just using a default for now XXX | ||||||
| 318 | |||||||
| 319 | 0 | my $time_remaining = 5; | |||||
| 320 | |||||||
| 321 | |||||||
| 322 | # my $client = OpenSRF::Utils::SettingsClient->new(); | ||||||
| 323 | # my $trans = $client->config_value("client_connection","transport_host"); | ||||||
| 324 | # | ||||||
| 325 | # if(!ref($trans)) { | ||||||
| 326 | # $time_remaining = $trans->{connect_timeout}; | ||||||
| 327 | # } else { | ||||||
| 328 | # # XXX for now, just use the first | ||||||
| 329 | # $time_remaining = $trans->[0]->{connect_timeout}; | ||||||
| 330 | # } | ||||||
| 331 | |||||||
| 332 | 0 | while ( $self->state != CONNECTED and $time_remaining > 0 ) { | |||||
| 333 | 0 | my $starttime = time; | |||||
| 334 | 0 | $self->queue_wait($time_remaining); | |||||
| 335 | 0 | my $endtime = time; | |||||
| 336 | 0 | $time_remaining -= ($endtime - $starttime); | |||||
| 337 | } | ||||||
| 338 | |||||||
| 339 | 0 | return undef unless($self->state == CONNECTED); | |||||
| 340 | |||||||
| 341 | 0 | $self->stateless(0); | |||||
| 342 | |||||||
| 343 | 0 | return $self; | |||||
| 344 | } | ||||||
| 345 | |||||||
| 346 | sub finish { | ||||||
| 347 | 0 | 0 | my $self = shift; | ||||
| 348 | 0 | if( ! $self->session_id ) { | |||||
| 349 | 0 | return 0; | |||||
| 350 | } | ||||||
| 351 | } | ||||||
| 352 | |||||||
| 353 | sub unregister_callback { | ||||||
| 354 | 0 | 0 | my $self = shift; | ||||
| 355 | 0 | my $type = shift; | |||||
| 356 | 0 | my $cb = shift; | |||||
| 357 | 0 | if (exists $self->{callbacks}{$type}) { | |||||
| 358 | 0 | delete $self->{callbacks}{$type}{$cb}; | |||||
| 359 | 0 | return $cb; | |||||
| 360 | } | ||||||
| 361 | 0 | return undef; | |||||
| 362 | } | ||||||
| 363 | |||||||
| 364 | sub register_callback { | ||||||
| 365 | 0 | 0 | my $self = shift; | ||||
| 366 | 0 | my $type = shift; | |||||
| 367 | 0 | my $cb = shift; | |||||
| 368 | 0 | my $cb_key = "$cb"; | |||||
| 369 | 0 | $self->{callbacks}{$type}{$cb_key} = $cb; | |||||
| 370 | 0 | return $cb_key; | |||||
| 371 | } | ||||||
| 372 | |||||||
| 373 | sub kill_me { | ||||||
| 374 | 0 | 0 | my $self = shift; | ||||
| 375 | 0 0 | if( ! $self->session_id ) { return 0; } | |||||
| 376 | |||||||
| 377 | # run each 'death' callback; | ||||||
| 378 | 0 | if (exists $self->{callbacks}{death}) { | |||||
| 379 | 0 0 | for my $sub (values %{$self->{callbacks}{death}}) { | |||||
| 380 | 0 | $sub->($self); | |||||
| 381 | } | ||||||
| 382 | } | ||||||
| 383 | |||||||
| 384 | 0 | $self->disconnect; | |||||
| 385 | 0 | $logger->transport( "AppSession killing self: " . $self->session_id(), DEBUG ); | |||||
| 386 | 0 | delete $_CACHE{$self->session_id}; | |||||
| 387 | 0 0 | delete($$self{$_}) for (keys %$self); | |||||
| 388 | } | ||||||
| 389 | |||||||
| 390 | sub disconnect { | ||||||
| 391 | 0 | 0 | my $self = shift; | ||||
| 392 | |||||||
| 393 | # run each 'disconnect' callback; | ||||||
| 394 | 0 | if (exists $self->{callbacks}{disconnect}) { | |||||
| 395 | 0 0 | for my $sub (values %{$self->{callbacks}{disconnect}}) { | |||||
| 396 | 0 | $sub->($self); | |||||
| 397 | } | ||||||
| 398 | } | ||||||
| 399 | |||||||
| 400 | 0 | if ( !$self->stateless and $self->state != DISCONNECTED ) { | |||||
| 401 | 0 | $self->send('DISCONNECT', "") if ($self->endpoint == CLIENT); | |||||
| 402 | 0 | $self->state( DISCONNECTED ); | |||||
| 403 | } | ||||||
| 404 | |||||||
| 405 | 0 | $self->reset; | |||||
| 406 | } | ||||||
| 407 | |||||||
| 408 | sub request { | ||||||
| 409 | 0 | 0 | my $self = shift; | ||||
| 410 | 0 | my $meth = shift; | |||||
| 411 | 0 | return unless $self; | |||||
| 412 | |||||||
| 413 | # tell the logger to create a new xid - the logger will decide if it's really necessary | ||||||
| 414 | 0 | $logger->mk_osrf_xid; | |||||
| 415 | |||||||
| 416 | 0 | my $method; | |||||
| 417 | 0 | if (!ref $meth) { | |||||
| 418 | 0 | $method = new OpenSRF::DomainObject::oilsMethod ( method => $meth ); | |||||
| 419 | } else { | ||||||
| 420 | 0 | $method = $meth; | |||||
| 421 | } | ||||||
| 422 | |||||||
| 423 | 0 | $method->params( @_ ); | |||||
| 424 | |||||||
| 425 | 0 | $self->send('REQUEST',$method); | |||||
| 426 | } | ||||||
| 427 | |||||||
| 428 | sub full_request { | ||||||
| 429 | 0 | 0 | my $self = shift; | ||||
| 430 | 0 | my $meth = shift; | |||||
| 431 | |||||||
| 432 | 0 | my $method; | |||||
| 433 | 0 | if (!ref $meth) { | |||||
| 434 | 0 | $method = new OpenSRF::DomainObject::oilsMethod ( method => $meth ); | |||||
| 435 | } else { | ||||||
| 436 | 0 | $method = $meth; | |||||
| 437 | } | ||||||
| 438 | |||||||
| 439 | 0 | $method->params( @_ ); | |||||
| 440 | |||||||
| 441 | 0 | $self->send(CONNECT => '', REQUEST => $method, DISCONNECT => ''); | |||||
| 442 | } | ||||||
| 443 | |||||||
| 444 | sub send { | ||||||
| 445 | 0 | 0 | my $self = shift; | ||||
| 446 | 0 | my @payload_list = @_; # this is a Domain Object | |||||
| 447 | |||||||
| 448 | 0 | return unless ($self and $self->{peer_handle}); | |||||
| 449 | |||||||
| 450 | 0 | $logger->debug( "In send", INTERNAL ); | |||||
| 451 | |||||||
| 452 | 0 | my $tT; | |||||
| 453 | |||||||
| 454 | 0 0 | if( @payload_list % 2 ) { $tT = pop @payload_list; } | |||||
| 455 | |||||||
| 456 | 0 | if( ! @payload_list ) { | |||||
| 457 | 0 | $logger->debug( "payload_list param is incomplete in AppSession::send()", ERROR ); | |||||
| 458 | 0 | return undef; | |||||
| 459 | } | ||||||
| 460 | |||||||
| 461 | 0 | my @doc = (); | |||||
| 462 | |||||||
| 463 | 0 | my $disconnect = 0; | |||||
| 464 | 0 | my $connecting = 0; | |||||
| 465 | |||||||
| 466 | 0 | while( @payload_list ) { | |||||
| 467 | |||||||
| 468 | 0 | my ($msg_type, $payload) = ( shift(@payload_list), shift(@payload_list) ); | |||||
| 469 | |||||||
| 470 | 0 | if ($msg_type eq 'DISCONNECT' ) { | |||||
| 471 | 0 | $disconnect++; | |||||
| 472 | 0 | if( $self->state == DISCONNECTED && !$connecting) { | |||||
| 473 | 0 | next; | |||||
| 474 | } | ||||||
| 475 | } | ||||||
| 476 | |||||||
| 477 | 0 | if( $msg_type eq "CONNECT" ) { | |||||
| 478 | 0 | $connecting++; | |||||
| 479 | } | ||||||
| 480 | |||||||
| 481 | 0 | my $msg = OpenSRF::DomainObject::oilsMessage->new(); | |||||
| 482 | 0 | $msg->type($msg_type); | |||||
| 483 | |||||||
| 484 | 9 9 9 | 106 36 64 | no warnings; | ||||
| 485 | 0 | $msg->threadTrace( $tT || int($self->session_threadTrace) || int($self->last_threadTrace) ); | |||||
| 486 | 9 9 9 | 62 42 65 | use warnings; | ||||
| 487 | |||||||
| 488 | 0 | if ($msg->type eq 'REQUEST') { | |||||
| 489 | 0 | if ( !defined($tT) || $self->last_threadTrace != $tT ) { | |||||
| 490 | 0 | $msg->update_threadTrace; | |||||
| 491 | 0 | $self->session_threadTrace( $msg->threadTrace ); | |||||
| 492 | 0 | $tT = $self->session_threadTrace; | |||||
| 493 | 0 | OpenSRF::AppRequest->new($self, $payload); | |||||
| 494 | } | ||||||
| 495 | } | ||||||
| 496 | |||||||
| 497 | 0 | $msg->api_level($self->api_level); | |||||
| 498 | 0 | $msg->payload($payload) if $payload; | |||||
| 499 | |||||||
| 500 | 0 | my $locale = $self->session_locale; | |||||
| 501 | 0 | $msg->sender_locale($locale) if ($locale); | |||||
| 502 | |||||||
| 503 | 0 | push @doc, $msg; | |||||
| 504 | |||||||
| 505 | |||||||
| 506 | 0 | $logger->debug( "AppSession sending ".$msg->type." to ".$self->remote_id. | |||||
| 507 | " with threadTrace [".$msg->threadTrace."]"); | ||||||
| 508 | |||||||
| 509 | } | ||||||
| 510 | |||||||
| 511 | 0 | if ($self->endpoint == CLIENT and ! $disconnect) { | |||||
| 512 | 0 | $self->queue_wait(0); | |||||
| 513 | |||||||
| 514 | |||||||
| 515 | 0 | if($self->stateless && $self->state != CONNECTED) { | |||||
| 516 | 0 | $self->reset; | |||||
| 517 | 0 | $logger->debug("AppSession is stateless in send", INTERNAL ); | |||||
| 518 | } | ||||||
| 519 | |||||||
| 520 | 0 | if( !$self->stateless and $self->state != CONNECTED ) { | |||||
| 521 | |||||||
| 522 | 0 | $logger->debug( "Sending connect before request 1", INTERNAL ); | |||||
| 523 | |||||||
| 524 | 0 | unless (($self->state == CONNECTING && $connecting )) { | |||||
| 525 | 0 | $logger->debug( "Sending connect before request 2", INTERNAL ); | |||||
| 526 | 0 | my $v = $self->connect(); | |||||
| 527 | 0 | if( ! $v ) { | |||||
| 528 | 0 | $logger->debug( "Unable to connect to remote service in AppSession::send()", ERROR ); | |||||
| 529 | 0 | return undef; | |||||
| 530 | } | ||||||
| 531 | 0 | if( ref($v) and $v->can("class") and $v->class->isa( "OpenSRF::EX" ) ) { | |||||
| 532 | 0 | return $v; | |||||
| 533 | } | ||||||
| 534 | } | ||||||
| 535 | } | ||||||
| 536 | |||||||
| 537 | } | ||||||
| 538 | 0 | my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc); | |||||
| 539 | 0 | $logger->internal("AppSession sending doc: $json"); | |||||
| 540 | |||||||
| 541 | 0 | $self->{peer_handle}->send( | |||||
| 542 | to => $self->remote_id, | ||||||
| 543 | thread => $self->session_id, | ||||||
| 544 | body => $json ); | ||||||
| 545 | |||||||
| 546 | 0 | if( $disconnect) { | |||||
| 547 | 0 | $self->state( DISCONNECTED ); | |||||
| 548 | } | ||||||
| 549 | |||||||
| 550 | 0 | my $req = $self->app_request( $tT ); | |||||
| 551 | 0 | $req->{_start} = time; | |||||
| 552 | 0 | return $req | |||||
| 553 | } | ||||||
| 554 | |||||||
| 555 | sub app_request { | ||||||
| 556 | 0 | 0 | my $self = shift; | ||||
| 557 | 0 | my $tT = shift; | |||||
| 558 | |||||||
| 559 | 0 | return undef unless (defined $tT); | |||||
| 560 | 0 0 0 | my ($req) = grep { $_->threadTrace == $tT } @{ $self->{request_queue} }; | |||||
| 561 | |||||||
| 562 | 0 | return $req; | |||||
| 563 | } | ||||||
| 564 | |||||||
| 565 | sub remove_app_request { | ||||||
| 566 | 0 | 0 | my $self = shift; | ||||
| 567 | 0 | my $req = shift; | |||||
| 568 | |||||||
| 569 | 0 0 0 | my @list = grep { $_->threadTrace != $req->threadTrace } @{ $self->{request_queue} }; | |||||
| 570 | |||||||
| 571 | 0 | $self->{request_queue} = \@list; | |||||
| 572 | } | ||||||
| 573 | |||||||
| 574 | sub endpoint { | ||||||
| 575 | 0 | 0 | return $_[0]->{endpoint}; | ||||
| 576 | } | ||||||
| 577 | |||||||
| 578 | |||||||
| 579 | sub session_id { | ||||||
| 580 | 0 | 0 | my $self = shift; | ||||
| 581 | 0 | return $self->{session_id}; | |||||
| 582 | } | ||||||
| 583 | |||||||
| 584 | sub push_queue { | ||||||
| 585 | 0 | 0 | my $self = shift; | ||||
| 586 | 0 | my $resp = shift; | |||||
| 587 | 0 | my $req = $self->app_request($resp->[1]); | |||||
| 588 | 0 | return $req->push_queue( $resp->[0] ) if ($req); | |||||
| 589 | 0 0 | push @{ $self->{recv_queue} }, $resp->[0]; | |||||
| 590 | } | ||||||
| 591 | |||||||
| 592 | sub last_threadTrace { | ||||||
| 593 | 0 | 0 | my $self = shift; | ||||
| 594 | 0 | my $new_last_threadTrace = shift; | |||||
| 595 | |||||||
| 596 | 0 | my $old_last_threadTrace = $self->{last_threadTrace}; | |||||
| 597 | 0 | if (defined $new_last_threadTrace) { | |||||
| 598 | 0 | $self->{last_threadTrace} = $new_last_threadTrace; | |||||
| 599 | 0 | return $new_last_threadTrace unless ($old_last_threadTrace); | |||||
| 600 | } | ||||||
| 601 | |||||||
| 602 | 0 | return $old_last_threadTrace; | |||||
| 603 | } | ||||||
| 604 | |||||||
| 605 | sub session_threadTrace { | ||||||
| 606 | 0 | 0 | my $self = shift; | ||||
| 607 | 0 | my $new_last_threadTrace = shift; | |||||
| 608 | |||||||
| 609 | 0 | my $old_last_threadTrace = $self->{session_threadTrace}; | |||||
| 610 | 0 | if (defined $new_last_threadTrace) { | |||||
| 611 | 0 | $self->{session_threadTrace} = $new_last_threadTrace; | |||||
| 612 | 0 | return $new_last_threadTrace unless ($old_last_threadTrace); | |||||
| 613 | } | ||||||
| 614 | |||||||
| 615 | 0 | return $old_last_threadTrace; | |||||
| 616 | } | ||||||
| 617 | |||||||
| 618 | sub last_message_type { | ||||||
| 619 | 0 | 0 | my $self = shift; | ||||
| 620 | 0 | my $new_last_message_type = shift; | |||||
| 621 | |||||||
| 622 | 0 | my $old_last_message_type = $self->{last_message_type}; | |||||
| 623 | 0 | if (defined $new_last_message_type) { | |||||
| 624 | 0 | $self->{last_message_type} = $new_last_message_type; | |||||
| 625 | 0 | return $new_last_message_type unless ($old_last_message_type); | |||||
| 626 | } | ||||||
| 627 | |||||||
| 628 | 0 | return $old_last_message_type; | |||||
| 629 | } | ||||||
| 630 | |||||||
| 631 | sub last_message_api_level { | ||||||
| 632 | 0 | 0 | my $self = shift; | ||||
| 633 | 0 | my $new_last_message_api_level = shift; | |||||
| 634 | |||||||
| 635 | 0 | my $old_last_message_api_level = $self->{last_message_api_level}; | |||||
| 636 | 0 | if (defined $new_last_message_api_level) { | |||||
| 637 | 0 | $self->{last_message_api_level} = $new_last_message_api_level; | |||||
| 638 | 0 | return $new_last_message_api_level unless ($old_last_message_api_level); | |||||
| 639 | } | ||||||
| 640 | |||||||
| 641 | 0 | return $old_last_message_api_level; | |||||
| 642 | } | ||||||
| 643 | |||||||
| 644 | sub remote_id { | ||||||
| 645 | 0 | 0 | my $self = shift; | ||||
| 646 | 0 | my $new_remote_id = shift; | |||||
| 647 | |||||||
| 648 | 0 | my $old_remote_id = $self->{remote_id}; | |||||
| 649 | 0 | if (defined $new_remote_id) { | |||||
| 650 | 0 | $self->{remote_id} = $new_remote_id; | |||||
| 651 | 0 | return $new_remote_id unless ($old_remote_id); | |||||
| 652 | } | ||||||
| 653 | |||||||
| 654 | 0 | return $old_remote_id; | |||||
| 655 | } | ||||||
| 656 | |||||||
| 657 | sub client_auth { | ||||||
| 658 | 0 | 0 | return undef; | ||||
| 659 | 0 | my $self = shift; | |||||
| 660 | 0 | my $new_ua = shift; | |||||
| 661 | |||||||
| 662 | 0 | my $old_ua = $self->{client_auth}; | |||||
| 663 | 0 | if (defined $new_ua) { | |||||
| 664 | 0 | $self->{client_auth} = $new_ua; | |||||
| 665 | 0 | return $new_ua unless ($old_ua); | |||||
| 666 | } | ||||||
| 667 | |||||||
| 668 | 0 | return $old_ua->cloneNode(1); | |||||
| 669 | } | ||||||
| 670 | |||||||
| 671 | sub state { | ||||||
| 672 | 0 | 0 | my $self = shift; | ||||
| 673 | 0 | my $new_state = shift; | |||||
| 674 | |||||||
| 675 | 0 | my $old_state = $self->{state}; | |||||
| 676 | 0 | if (defined $new_state) { | |||||
| 677 | 0 | $self->{state} = $new_state; | |||||
| 678 | 0 | return $new_state unless ($old_state); | |||||
| 679 | } | ||||||
| 680 | |||||||
| 681 | 0 | return $old_state; | |||||
| 682 | } | ||||||
| 683 | |||||||
| 684 | sub DESTROY { | ||||||
| 685 | 0 | my $self = shift; | |||||
| 686 | 0 0 | delete $$self{$_} for keys %$self; | |||||
| 687 | 0 | return undef; | |||||
| 688 | } | ||||||
| 689 | |||||||
| 690 | sub recv { | ||||||
| 691 | 0 | 0 | my $self = shift; | ||||
| 692 | 0 | my @proto_args = @_; | |||||
| 693 | 0 | my %args; | |||||
| 694 | |||||||
| 695 | 0 | if ( @proto_args ) { | |||||
| 696 | 0 | if ( !(@proto_args % 2) ) { | |||||
| 697 | 0 | %args = @proto_args; | |||||
| 698 | } elsif (@proto_args == 1) { | ||||||
| 699 | 0 | %args = ( timeout => @proto_args ); | |||||
| 700 | } | ||||||
| 701 | } | ||||||
| 702 | |||||||
| 703 | #$logger->debug( ref($self). " recv_queue before wait: " . $self->_print_queue(), INTERNAL ); | ||||||
| 704 | |||||||
| 705 | 0 | if( exists( $args{timeout} ) ) { | |||||
| 706 | 0 | $args{timeout} = int($args{timeout}); | |||||
| 707 | 0 | $self->{recv_timeout} = $args{timeout}; | |||||
| 708 | } | ||||||
| 709 | |||||||
| 710 | #$args{timeout} = 0 if ($self->complete); | ||||||
| 711 | |||||||
| 712 | 0 | if(defined($args{timeout})) { | |||||
| 713 | 0 | $logger->debug( ref($self) ."->recv with timeout " . $args{timeout}, INTERNAL ); | |||||
| 714 | } | ||||||
| 715 | |||||||
| 716 | 0 0 | my $avail = @{ $self->{recv_queue} }; | |||||
| 717 | 0 | $self->{remaining_recv_timeout} = $self->{recv_timeout}; | |||||
| 718 | |||||||
| 719 | 0 | if (!$args{count}) { | |||||
| 720 | 0 | if (wantarray) { | |||||
| 721 | 0 | $args{count} = $avail; | |||||
| 722 | } else { | ||||||
| 723 | 0 | $args{count} = 1; | |||||
| 724 | } | ||||||
| 725 | } | ||||||
| 726 | |||||||
| 727 | 0 | while ( $self->{remaining_recv_timeout} > 0 and $avail < $args{count} ) { | |||||
| 728 | 0 | last if $self->complete; | |||||
| 729 | 0 | my $starttime = time; | |||||
| 730 | 0 | $self->queue_wait($self->{remaining_recv_timeout}); | |||||
| 731 | 0 | my $endtime = time; | |||||
| 732 | 0 | if ($self->{timeout_reset}) { | |||||
| 733 | 0 | $self->{timeout_reset} = 0; | |||||
| 734 | } else { | ||||||
| 735 | 0 | $self->{remaining_recv_timeout} -= ($endtime - $starttime) | |||||
| 736 | } | ||||||
| 737 | 0 0 | $avail = @{ $self->{recv_queue} }; | |||||
| 738 | } | ||||||
| 739 | |||||||
| 740 | 0 | $self->timed_out(1) if ( $self->{remaining_recv_timeout} <= 0 ); | |||||
| 741 | |||||||
| 742 | 0 | my @list; | |||||
| 743 | 0 0 | while ( my $msg = shift @{ $self->{recv_queue} } ) { | |||||
| 744 | 0 | push @list, $msg; | |||||
| 745 | 0 | last if (scalar(@list) >= $args{count}); | |||||
| 746 | } | ||||||
| 747 | |||||||
| 748 | 0 | $logger->debug( "Number of matched responses: " . @list, DEBUG ); | |||||
| 749 | 0 | $self->queue_wait(0); # check for statuses | |||||
| 750 | |||||||
| 751 | 0 | return $list[0] if (!wantarray); | |||||
| 752 | 0 | return @list; | |||||
| 753 | } | ||||||
| 754 | |||||||
| 755 | sub timed_out { | ||||||
| 756 | 0 | 0 | my $self = shift; | ||||
| 757 | 0 | my $out = shift; | |||||
| 758 | 0 | $self->{timed_out} = $out if (defined $out); | |||||
| 759 | 0 | return $self->{timed_out}; | |||||
| 760 | } | ||||||
| 761 | |||||||
| 762 | sub push_resend { | ||||||
| 763 | 0 | 0 | my $self = shift; | ||||
| 764 | 0 | push @OpenSRF::AppSession::_RESEND_QUEUE, @_; | |||||
| 765 | } | ||||||
| 766 | |||||||
| 767 | sub flush_resend { | ||||||
| 768 | 0 | 0 | my $self = shift; | ||||
| 769 | 0 | $logger->debug( "Resending..." . @_RESEND_QUEUE, INTERNAL ); | |||||
| 770 | 0 | while ( my $req = shift @OpenSRF::AppSession::_RESEND_QUEUE ) { | |||||
| 771 | 0 | $req->resend unless $req->complete; | |||||
| 772 | } | ||||||
| 773 | } | ||||||
| 774 | |||||||
| 775 | |||||||
| 776 | sub queue_wait { | ||||||
| 777 | 0 | 0 | my $self = shift; | ||||
| 778 | 0 0 | if( ! $self->{peer_handle} ) { return 0; } | |||||
| 779 | 0 | my $timeout = shift || 0; | |||||
| 780 | 0 | $logger->debug( "Calling queue_wait($timeout)" , INTERNAL ); | |||||
| 781 | 0 | my $o = $self->{peer_handle}->process($timeout); | |||||
| 782 | 0 | $self->flush_resend; | |||||
| 783 | 0 | return $o; | |||||
| 784 | } | ||||||
| 785 | |||||||
| 786 | sub _print_queue { | ||||||
| 787 | 0 | my( $self ) = @_; | |||||
| 788 | 0 | my $string = ""; | |||||
| 789 | 0 0 | foreach my $msg ( @{$self->{recv_queue}} ) { | |||||
| 790 | 0 | $string = $string . $msg->toString(1) . "\n"; | |||||
| 791 | } | ||||||
| 792 | 0 | return $string; | |||||
| 793 | } | ||||||
| 794 | |||||||
| 795 | sub status { | ||||||
| 796 | 0 | 0 | my $self = shift; | ||||
| 797 | 0 | return unless $self; | |||||
| 798 | 0 | $self->send( 'STATUS', @_ ); | |||||
| 799 | } | ||||||
| 800 | |||||||
| 801 | sub reset_request_timeout { | ||||||
| 802 | 0 | 0 | my $self = shift; | ||||
| 803 | 0 | my $tt = shift; | |||||
| 804 | 0 | my $req = $self->app_request($tt); | |||||
| 805 | 0 | $req->{remaining_recv_timeout} = $req->{recv_timeout}; | |||||
| 806 | 0 | $req->{timout_reset} = 1; | |||||
| 807 | } | ||||||
| 808 | |||||||
| 809 | #------------------------------------------------------------------------------- | ||||||
| 810 | |||||||
| 811 | package OpenSRF::AppRequest; | ||||||
| 812 | 9 9 9 | 79 31 67 | use base qw/OpenSRF::AppSession/; | ||||
| 813 | 9 9 9 | 64 41 85 | use OpenSRF::Utils::Logger qw/:level/; | ||||
| 814 | 9 9 9 | 65 31 72 | use OpenSRF::DomainObject::oilsResponse qw/:status/; | ||||
| 815 | 9 9 9 | 60 34 63 | use Time::HiRes qw/time usleep/; | ||||
| 816 | |||||||
| 817 | sub new { | ||||||
| 818 | 0 | my $class = shift; | |||||
| 819 | 0 | $class = ref($class) || $class; | |||||
| 820 | |||||||
| 821 | 0 | my $session = shift; | |||||
| 822 | 0 | my $threadTrace = $session->session_threadTrace || $session->last_threadTrace; | |||||
| 823 | 0 | my $payload = shift; | |||||
| 824 | |||||||
| 825 | 0 | my $self = { session => $session, | |||||
| 826 | threadTrace => $threadTrace, | ||||||
| 827 | payload => $payload, | ||||||
| 828 | complete => 0, | ||||||
| 829 | timeout_reset => 0, | ||||||
| 830 | recv_timeout => 30, | ||||||
| 831 | remaining_recv_timeout => 30, | ||||||
| 832 | recv_queue => [], | ||||||
| 833 | }; | ||||||
| 834 | |||||||
| 835 | 0 | bless $self => $class; | |||||
| 836 | |||||||
| 837 | 0 0 | push @{ $self->session->{request_queue} }, $self; | |||||
| 838 | |||||||
| 839 | 0 | return $self; | |||||
| 840 | } | ||||||
| 841 | |||||||
| 842 | sub recv_timeout { | ||||||
| 843 | 0 | my $self = shift; | |||||
| 844 | 0 | my $timeout = shift; | |||||
| 845 | 0 | if (defined $timeout) { | |||||
| 846 | 0 | $self->{recv_timeout} = $timeout; | |||||
| 847 | 0 | $self->{remaining_recv_timeout} = $timeout; | |||||
| 848 | } | ||||||
| 849 | 0 | return $self->{recv_timeout}; | |||||
| 850 | } | ||||||
| 851 | |||||||
| 852 | sub queue_size { | ||||||
| 853 | 0 0 | my $size = @{$_[0]->{recv_queue}}; | |||||
| 854 | 0 | return $size; | |||||
| 855 | } | ||||||
| 856 | |||||||
| 857 | sub send { | ||||||
| 858 | 0 | 0 | my $self = shift; | ||||
| 859 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
| 860 | 0 | $self->session->send(@_); | |||||
| 861 | } | ||||||
| 862 | |||||||
| 863 | sub finish { | ||||||
| 864 | 0 | 0 | my $self = shift; | ||||
| 865 | 0 | return unless $self->session; | |||||
| 866 | 0 | $self->session->remove_app_request($self); | |||||
| 867 | 0 0 | delete($$self{$_}) for (keys %$self); | |||||
| 868 | } | ||||||
| 869 | |||||||
| 870 | sub session { | ||||||
| 871 | 0 | return shift()->{session}; | |||||
| 872 | } | ||||||
| 873 | |||||||
| 874 | sub complete { | ||||||
| 875 | 0 | my $self = shift; | |||||
| 876 | 0 | my $complete = shift; | |||||
| 877 | 0 | return $self->{complete} if ($self->{complete}); | |||||
| 878 | 0 | if (defined $complete) { | |||||
| 879 | 0 | $self->{complete} = $complete; | |||||
| 880 | 0 | $self->{_duration} = time - $self->{_start} if ($self->{complete}); | |||||
| 881 | } else { | ||||||
| 882 | 0 | $self->session->queue_wait(0); | |||||
| 883 | } | ||||||
| 884 | 0 | return $self->{complete}; | |||||
| 885 | } | ||||||
| 886 | |||||||
| 887 | sub duration { | ||||||
| 888 | 0 | my $self = shift; | |||||
| 889 | 0 | $self->wait_complete; | |||||
| 890 | 0 | return $self->{_duration}; | |||||
| 891 | } | ||||||
| 892 | |||||||
| 893 | sub wait_complete { | ||||||
| 894 | 0 | my $self = shift; | |||||
| 895 | 0 | my $timeout = shift || 10; | |||||
| 896 | 0 | my $time_remaining = $timeout; | |||||
| 897 | |||||||
| 898 | 0 | while ( ! $self->complete and $time_remaining > 0 ) { | |||||
| 899 | 0 | my $starttime = time; | |||||
| 900 | 0 | $self->queue_wait($time_remaining); | |||||
| 901 | 0 | my $endtime = time; | |||||
| 902 | 0 | $time_remaining -= ($endtime - $starttime); | |||||
| 903 | } | ||||||
| 904 | |||||||
| 905 | 0 | return $self->complete; | |||||
| 906 | } | ||||||
| 907 | |||||||
| 908 | sub threadTrace { | ||||||
| 909 | 0 | return shift()->{threadTrace}; | |||||
| 910 | } | ||||||
| 911 | |||||||
| 912 | sub push_queue { | ||||||
| 913 | 0 | 0 | my $self = shift; | ||||
| 914 | 0 | my $resp = shift; | |||||
| 915 | 0 0 | if( !$resp ) { return 0; } | |||||
| 916 | 0 | if( UNIVERSAL::isa($resp, "Error")) { | |||||
| 917 | 0 | $self->{failed} = $resp; | |||||
| 918 | 0 | $self->complete(1); | |||||
| 919 | #return; eventually... | ||||||
| 920 | } | ||||||
| 921 | 0 0 | push @{ $self->{recv_queue} }, $resp; | |||||
| 922 | } | ||||||
| 923 | |||||||
| 924 | sub failed { | ||||||
| 925 | 0 | my $self = shift; | |||||
| 926 | 0 | return $self->{failed}; | |||||
| 927 | } | ||||||
| 928 | |||||||
| 929 | sub queue_wait { | ||||||
| 930 | 0 | 0 | my $self = shift; | ||||
| 931 | 0 | return $self->session->queue_wait(@_) | |||||
| 932 | } | ||||||
| 933 | |||||||
| 934 | 0 | sub payload { return shift()->{payload}; } | |||||
| 935 | |||||||
| 936 | sub resend { | ||||||
| 937 | 0 | my $self = shift; | |||||
| 938 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
| 939 | 0 | OpenSRF::Utils::Logger->debug( "I'm resending the request for threadTrace ". $self->threadTrace, DEBUG); | |||||
| 940 | 0 | return $self->session->send('REQUEST', $self->payload, $self->threadTrace ); | |||||
| 941 | } | ||||||
| 942 | |||||||
| 943 | sub status { | ||||||
| 944 | 0 | 0 | my $self = shift; | ||||
| 945 | 0 | my $msg = shift; | |||||
| 946 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
| 947 | 0 | $self->session->send( 'STATUS',$msg, $self->threadTrace ); | |||||
| 948 | } | ||||||
| 949 | |||||||
| 950 | # TODO stream_push only works when server sessions can accept RESULT | ||||||
| 951 | # messages, which is no longer supported. Create a new OpenSRF message | ||||||
| 952 | # type to support client-to-server streams. | ||||||
| 953 | #sub stream_push { | ||||||
| 954 | # my $self = shift; | ||||||
| 955 | # my $msg = shift; | ||||||
| 956 | # $self->respond( $msg ); | ||||||
| 957 | #} | ||||||
| 958 | |||||||
| 959 | sub respond { | ||||||
| 960 | 0 | my $self = shift; | |||||
| 961 | 0 | my $msg = shift; | |||||
| 962 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
| 963 | |||||||
| 964 | 0 | my $response; | |||||
| 965 | 0 | if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) { | |||||
| 966 | 0 | $response = $msg; | |||||
| 967 | } else { | ||||||
| 968 | 0 | $response = new OpenSRF::DomainObject::oilsResult; | |||||
| 969 | 0 | $response->content($msg); | |||||
| 970 | } | ||||||
| 971 | |||||||
| 972 | 0 | $self->session->send('RESULT', $response, $self->threadTrace); | |||||
| 973 | } | ||||||
| 974 | |||||||
| 975 | sub respond_complete { | ||||||
| 976 | 0 | my $self = shift; | |||||
| 977 | 0 | my $msg = shift; | |||||
| 978 | 0 | return unless ($self and $self->session and !$self->complete); | |||||
| 979 | |||||||
| 980 | 0 | my $response; | |||||
| 981 | 0 | if (ref($msg) && UNIVERSAL::isa($msg, 'OpenSRF::DomainObject::oilsResult')) { | |||||
| 982 | 0 | $response = $msg; | |||||
| 983 | } else { | ||||||
| 984 | 0 | $response = new OpenSRF::DomainObject::oilsResult; | |||||
| 985 | 0 | $response->content($msg); | |||||
| 986 | } | ||||||
| 987 | |||||||
| 988 | 0 | my $stat = OpenSRF::DomainObject::oilsConnectStatus->new( | |||||
| 989 | statusCode => STATUS_COMPLETE(), | ||||||
| 990 | status => 'Request Complete' ); | ||||||
| 991 | |||||||
| 992 | |||||||
| 993 | 0 | $self->session->send( 'RESULT' => $response, 'STATUS' => $stat, $self->threadTrace); | |||||
| 994 | 0 | $self->complete(1); | |||||
| 995 | } | ||||||
| 996 | |||||||
| 997 | sub register_death_callback { | ||||||
| 998 | 0 | my $self = shift; | |||||
| 999 | 0 | my $cb = shift; | |||||
| 1000 | 0 | $self->session->register_callback( death => $cb ); | |||||
| 1001 | } | ||||||
| 1002 | |||||||
| 1003 | |||||||
| 1004 | # utility method. checks to see of the request failed. | ||||||
| 1005 | # if so, throws an OpenSRF::EX::ERROR. if everything is | ||||||
| 1006 | # ok, it returns the content of the request | ||||||
| 1007 | sub gather { | ||||||
| 1008 | 0 | my $self = shift; | |||||
| 1009 | 0 | my $finish = shift; | |||||
| 1010 | 0 | $self->wait_complete; | |||||
| 1011 | 0 | my $resp = $self->recv( timeout => 60 ); | |||||
| 1012 | 0 | if( $self->failed() ) { | |||||
| 1013 | 0 | throw OpenSRF::EX::ERROR | |||||
| 1014 | ($self->failed()->stringify()); | ||||||
| 1015 | } | ||||||
| 1016 | 0 0 | if(!$resp) { return undef; } | |||||
| 1017 | 0 | my $content = $resp->content; | |||||
| 1018 | 0 0 | if($finish) { $self->finish();} | |||||
| 1019 | 0 | return $content; | |||||
| 1020 | } | ||||||
| 1021 | |||||||
| 1022 | |||||||
| 1023 | package OpenSRF::AppSubrequest; | ||||||
| 1024 | |||||||
| 1025 | sub respond { | ||||||
| 1026 | 0 | my $self = shift; | |||||
| 1027 | 0 | my $resp = shift; | |||||
| 1028 | 0 0 | push @{$$self{resp}}, $resp if (defined $resp); | |||||
| 1029 | } | ||||||
| 1030 | 0 | sub respond_complete { respond(@_); } | |||||
| 1031 | |||||||
| 1032 | sub new { | ||||||
| 1033 | 0 | my $class = shift; | |||||
| 1034 | 0 | $class = ref($class) || $class; | |||||
| 1035 | 0 | return bless({resp => [], @_}, $class); | |||||
| 1036 | } | ||||||
| 1037 | |||||||
| 1038 | 0 0 | sub responses { @{$_[0]->{resp}} } | |||||
| 1039 | |||||||
| 1040 | sub session { | ||||||
| 1041 | 0 | my $x = shift; | |||||
| 1042 | 0 | my $s = shift; | |||||
| 1043 | 0 | $x->{session} = $s if ($s); | |||||
| 1044 | 0 | return $x->{session}; | |||||
| 1045 | } | ||||||
| 1046 | |||||||
| 1047 | 0 | 0 | sub status {} | ||||
| 1048 | |||||||
| 1049 | |||||||
| 1050 | 1; | ||||||
| 1051 | |||||||