| File: | blib/lib/OpenSRF/Transport.pm |
| Coverage: | 27.3% |
| line | stmt | bran | cond | sub | pod | time | code |
|---|---|---|---|---|---|---|---|
| 1 | package OpenSRF::Transport; | ||||||
| 2 | 9 9 9 9 9 9 | 65 34 61 64 31 76 | use strict; use warnings; | ||||
| 3 | 9 9 9 | 62 57 53 | use base 'OpenSRF'; | ||||
| 4 | 9 9 9 | 70 31 60 | use Time::HiRes qw/time/; | ||||
| 5 | 9 9 9 | 81 32 83 | use OpenSRF::AppSession; | ||||
| 6 | 9 9 9 | 66 30 59 | use OpenSRF::Utils::JSON; | ||||
| 7 | 9 9 9 | 55 37 57 | use OpenSRF::Utils::Logger qw(:level); | ||||
| 8 | 9 9 9 | 62 36 77 | use OpenSRF::DomainObject::oilsResponse qw/:status/; | ||||
| 9 | 9 9 9 | 62 35 57 | use OpenSRF::EX qw/:try/; | ||||
| 10 | 9 9 9 | 118 39 110 | use OpenSRF::Transport::SlimJabber::MessageWrapper; | ||||
| 11 | |||||||
| 12 | #------------------ | ||||||
| 13 | # --- These must be implemented by all Transport subclasses | ||||||
| 14 | # ------------------------------------------- | ||||||
| 15 | |||||||
| 16 - 21 | =head2 get_listener Returns the package name of the package the system will use to gather incoming requests =cut | ||||||
| 22 | |||||||
| 23 | 0 | 1 | sub get_listener { shift()->alert_abstract(); } | ||||
| 24 | |||||||
| 25 - 29 | =head2 get_peer_client Returns the name of the package responsible for client communication =cut | ||||||
| 30 | |||||||
| 31 | 0 | 1 | sub get_peer_client { shift()->alert_abstract(); } | ||||
| 32 | |||||||
| 33 - 37 | =head2 get_msg_envelope Returns the name of the package responsible for parsing incoming messages =cut | ||||||
| 38 | |||||||
| 39 | 0 | 1 | sub get_msg_envelope { shift()->alert_abstract(); } | ||||
| 40 | |||||||
| 41 | # ------------------------------------------- | ||||||
| 42 | |||||||
| 43 | our $message_envelope; | ||||||
| 44 | my $logger = "OpenSRF::Utils::Logger"; | ||||||
| 45 | |||||||
| 46 | |||||||
| 47 | |||||||
| 48 - 54 | =head2 message_envelope( [$envelope] ); Sets the message envelope class that will allow us to extract information from the messages we receive from the low level transport =cut | ||||||
| 55 | |||||||
| 56 | sub message_envelope { | ||||||
| 57 | 0 | 1 | my( $class, $envelope ) = @_; | ||||
| 58 | 0 | if( $envelope ) { | |||||
| 59 | 0 | $message_envelope = $envelope; | |||||
| 60 | 0 | $envelope->use; | |||||
| 61 | 0 | if( $@ ) { | |||||
| 62 | 0 | $logger->error( | |||||
| 63 | "Error loading message_envelope: $envelope -> $@", ERROR); | ||||||
| 64 | } | ||||||
| 65 | } | ||||||
| 66 | 0 | return $message_envelope; | |||||
| 67 | } | ||||||
| 68 | |||||||
| 69 - 76 | =head2 handler( $data ) Creates a new MessageWrapper, extracts the remote_id, session_id, and message body from the message. Then, creates or retrieves the AppSession object with the session_id and remote_id. Finally, creates the message document from the body of the message and calls the handler method on the message document. =cut | ||||||
| 77 | |||||||
| 78 | sub handler { | ||||||
| 79 | 0 | 1 | my $start_time = time(); | ||||
| 80 | 0 | my( $class, $service, $data ) = @_; | |||||
| 81 | |||||||
| 82 | 0 | $logger->transport( "Transport handler() received $data", INTERNAL ); | |||||
| 83 | |||||||
| 84 | 0 | my $remote_id = $data->from; | |||||
| 85 | 0 | my $sess_id = $data->thread; | |||||
| 86 | 0 | my $body = $data->body; | |||||
| 87 | 0 | my $type = $data->type; | |||||
| 88 | |||||||
| 89 | 0 | $logger->set_osrf_xid($data->osrf_xid); | |||||
| 90 | |||||||
| 91 | |||||||
| 92 | 0 | if (defined($type) and $type eq 'error') { | |||||
| 93 | 0 | throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!"); | |||||
| 94 | |||||||
| 95 | } | ||||||
| 96 | |||||||
| 97 | # See if the app_session already exists. If so, make | ||||||
| 98 | # sure the sender hasn't changed if we're a server | ||||||
| 99 | 0 | my $app_session = OpenSRF::AppSession->find( $sess_id ); | |||||
| 100 | 0 | if( $app_session and $app_session->endpoint == $app_session->SERVER() and | |||||
| 101 | $app_session->remote_id ne $remote_id ) { | ||||||
| 102 | |||||||
| 103 | 0 | my $c = OpenSRF::Utils::SettingsClient->new(); | |||||
| 104 | 0 | if($c->config_value("apps", $app_session->service, "migratable")) { | |||||
| 105 | 0 | $logger->debug("service is migratable, new client is $remote_id"); | |||||
| 106 | } else { | ||||||
| 107 | |||||||
| 108 | 0 | $logger->warn("Backend Gone or invalid sender"); | |||||
| 109 | 0 | my $res = OpenSRF::DomainObject::oilsBrokenSession->new(); | |||||
| 110 | 0 | $res->status( "Backend Gone or invalid sender, Reconnect" ); | |||||
| 111 | 0 | $app_session->status( $res ); | |||||
| 112 | 0 | return 1; | |||||
| 113 | } | ||||||
| 114 | } | ||||||
| 115 | |||||||
| 116 | # Retrieve or build the app_session as appropriate (server_build decides which to do) | ||||||
| 117 | 0 | $logger->transport( "AppSession is valid or does not exist yet", INTERNAL ); | |||||
| 118 | 0 | $app_session = OpenSRF::AppSession->server_build( $sess_id, $remote_id, $service ); | |||||
| 119 | |||||||
| 120 | 0 | if( ! $app_session ) { | |||||
| 121 | 0 | throw OpenSRF::EX::Session ("Transport::handler(): No AppSession object returned from server_build()"); | |||||
| 122 | } | ||||||
| 123 | |||||||
| 124 | # Create a document from the JSON contained within the message | ||||||
| 125 | 0 | my $doc; | |||||
| 126 | 0 0 | eval { $doc = OpenSRF::Utils::JSON->JSON2perl($body); }; | |||||
| 127 | 0 | if( $@ ) { | |||||
| 128 | |||||||
| 129 | 0 | $logger->warn("Received bogus JSON: $@"); | |||||
| 130 | 0 | $logger->warn("Bogus JSON data: $body"); | |||||
| 131 | 0 | my $res = OpenSRF::DomainObject::oilsXMLParseError->new( status => "JSON Parse Error --- $body\n\n$@" ); | |||||
| 132 | |||||||
| 133 | 0 | $app_session->status($res); | |||||
| 134 | #$app_session->kill_me; | ||||||
| 135 | 0 | return 1; | |||||
| 136 | } | ||||||
| 137 | |||||||
| 138 | 0 | $logger->transport( "Transport::handler() creating \n$body", INTERNAL ); | |||||
| 139 | |||||||
| 140 | # We need to disconnect the session if we got a jabber error on the client side. For | ||||||
| 141 | # server side, we'll just tear down the session and go away. | ||||||
| 142 | 0 | if (defined($type) and $type eq 'error') { | |||||
| 143 | # If we're a server | ||||||
| 144 | 0 | if( $app_session->endpoint == $app_session->SERVER() ) { | |||||
| 145 | 0 | $app_session->kill_me; | |||||
| 146 | 0 | return 1; | |||||
| 147 | } else { | ||||||
| 148 | 0 | $app_session->reset; | |||||
| 149 | 0 | $app_session->state( $app_session->DISCONNECTED ); | |||||
| 150 | # below will lead to infinite looping, should return an exception | ||||||
| 151 | #$app_session->push_resend( $app_session->app_request( | ||||||
| 152 | # $doc->documentElement->firstChild->threadTrace ) ); | ||||||
| 153 | 0 | $logger->debug( | |||||
| 154 | "Got Jabber error on client connection $remote_id, nothing we can do..", ERROR ); | ||||||
| 155 | 0 | return 1; | |||||
| 156 | } | ||||||
| 157 | } | ||||||
| 158 | |||||||
| 159 | # cycle through and pass each oilsMessage contained in the message | ||||||
| 160 | # up to the message layer for processing. | ||||||
| 161 | 0 | for my $msg (@$doc) { | |||||
| 162 | |||||||
| 163 | 0 | next unless ( $msg && UNIVERSAL::isa($msg => 'OpenSRF::DomainObject::oilsMessage')); | |||||
| 164 | |||||||
| 165 | 0 | if( $app_session->endpoint == $app_session->SERVER() ) { | |||||
| 166 | |||||||
| 167 | try { | ||||||
| 168 | |||||||
| 169 | 0 0 | if( ! $msg->handler( $app_session ) ) { return 0; } | |||||
| 170 | |||||||
| 171 | 0 | $logger->debug("Successfully handled message", DEBUG); | |||||
| 172 | |||||||
| 173 | } catch Error with { | ||||||
| 174 | |||||||
| 175 | 0 | my $e = shift; | |||||
| 176 | 0 | my $res = OpenSRF::DomainObject::oilsServerError->new(); | |||||
| 177 | 0 | $res->status( $res->status . "\n$e"); | |||||
| 178 | 0 | $app_session->status($res) if $res; | |||||
| 179 | 0 | $app_session->kill_me; | |||||
| 180 | 0 | return 0; | |||||
| 181 | |||||||
| 182 | 0 | }; | |||||
| 183 | |||||||
| 184 | } else { | ||||||
| 185 | |||||||
| 186 | 0 0 | if( ! $msg->handler( $app_session ) ) { return 0; } | |||||
| 187 | 0 | $logger->debug("Successfully handled message", DEBUG); | |||||
| 188 | |||||||
| 189 | } | ||||||
| 190 | |||||||
| 191 | } | ||||||
| 192 | |||||||
| 193 | 0 | $logger->debug(sprintf("Message processing duration: %.3fs",(time() - $start_time)), DEBUG); | |||||
| 194 | |||||||
| 195 | 0 | return $app_session; | |||||
| 196 | } | ||||||
| 197 | |||||||
| 198 | 1; | ||||||