| File: | blib/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm |
| Coverage: | 20.2% |
| line | stmt | bran | cond | sub | pod | time | code |
|---|---|---|---|---|---|---|---|
| 1 | package OpenSRF::Transport::SlimJabber::XMPPReader; | ||||||
| 2 | 9 9 9 9 9 9 | 56 34 48 60 34 52 | use strict; use warnings; | ||||
| 3 | 9 9 9 | 122 41 139 | use XML::Parser; | ||||
| 4 | 9 9 9 | 77 34 98 | use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); | ||||
| 5 | 9 9 9 | 64 35 74 | use Time::HiRes qw/time/; | ||||
| 6 | 9 9 9 | 130 40 153 | use OpenSRF::Transport::SlimJabber::XMPPMessage; | ||||
| 7 | 9 9 9 | 74 33 59 | use OpenSRF::Utils::Logger qw/$logger/; | ||||
| 8 | |||||||
| 9 | # ----------------------------------------------------------- | ||||||
| 10 | # Connect, disconnect, and authentication messsage templates | ||||||
| 11 | # ----------------------------------------------------------- | ||||||
| 12 | 9 | 53 | use constant JABBER_CONNECT => | ||||
| 13 | 9 9 | 64 36 | "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>"; | ||||
| 14 | |||||||
| 15 | 9 | 49 | use constant JABBER_BASIC_AUTH => | ||||
| 16 | "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" . | ||||||
| 17 | 9 9 | 62 34 | "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>"; | ||||
| 18 | |||||||
| 19 | 9 9 9 | 60 37 53 | use constant JABBER_DISCONNECT => "</stream:stream>"; | ||||
| 20 | |||||||
| 21 | |||||||
| 22 | # ----------------------------------------------------------- | ||||||
| 23 | # XMPP Stream states | ||||||
| 24 | # ----------------------------------------------------------- | ||||||
| 25 | 9 9 9 | 84 40 50 | use constant DISCONNECTED => 1; | ||||
| 26 | 9 9 9 | 62 35 50 | use constant CONNECT_RECV => 2; | ||||
| 27 | 9 9 9 | 63 39 46 | use constant CONNECTED => 3; | ||||
| 28 | |||||||
| 29 | |||||||
| 30 | # ----------------------------------------------------------- | ||||||
| 31 | # XMPP Message states | ||||||
| 32 | # ----------------------------------------------------------- | ||||||
| 33 | 9 9 9 | 74 33 56 | use constant IN_NOTHING => 1; | ||||
| 34 | 9 9 9 | 59 35 49 | use constant IN_BODY => 2; | ||||
| 35 | 9 9 9 | 59 44 51 | use constant IN_THREAD => 3; | ||||
| 36 | 9 9 9 | 62 33 46 | use constant IN_STATUS => 4; | ||||
| 37 | |||||||
| 38 | |||||||
| 39 | # ----------------------------------------------------------- | ||||||
| 40 | # Constructor, getter/setters | ||||||
| 41 | # ----------------------------------------------------------- | ||||||
| 42 | sub new { | ||||||
| 43 | 0 | 0 | my $class = shift; | ||||
| 44 | 0 | my $socket = shift; | |||||
| 45 | |||||||
| 46 | 0 | my $self = bless({}, $class); | |||||
| 47 | |||||||
| 48 | 0 | $self->{queue} = []; | |||||
| 49 | 0 | $self->{stream_state} = DISCONNECTED; | |||||
| 50 | 0 | $self->{xml_state} = IN_NOTHING; | |||||
| 51 | 0 | $self->socket($socket); | |||||
| 52 | |||||||
| 53 | 0 | my $p = new XML::Parser(Handlers => { | |||||
| 54 | Start => \&start_element, | ||||||
| 55 | End => \&end_element, | ||||||
| 56 | Char => \&characters, | ||||||
| 57 | }); | ||||||
| 58 | |||||||
| 59 | 0 | $self->parser($p->parse_start); # create a push parser | |||||
| 60 | 0 | $self->parser->{_parent_} = $self; | |||||
| 61 | 0 | $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; | |||||
| 62 | 0 | return $self; | |||||
| 63 | } | ||||||
| 64 | |||||||
| 65 | sub push_msg { | ||||||
| 66 | 0 | 0 | my($self, $msg) = @_; | ||||
| 67 | 0 0 | push(@{$self->{queue}}, $msg) if $msg; | |||||
| 68 | } | ||||||
| 69 | |||||||
| 70 | sub next_msg { | ||||||
| 71 | 0 | 0 | my $self = shift; | ||||
| 72 | 0 0 | return shift @{$self->{queue}}; | |||||
| 73 | } | ||||||
| 74 | |||||||
| 75 | sub peek_msg { | ||||||
| 76 | 0 | 0 | my $self = shift; | ||||
| 77 | 0 0 | return (@{$self->{queue}} > 0); | |||||
| 78 | } | ||||||
| 79 | |||||||
| 80 | sub parser { | ||||||
| 81 | 0 | 0 | my($self, $parser) = @_; | ||||
| 82 | 0 | $self->{parser} = $parser if $parser; | |||||
| 83 | 0 | return $self->{parser}; | |||||
| 84 | } | ||||||
| 85 | |||||||
| 86 | sub socket { | ||||||
| 87 | 0 | 0 | my($self, $socket) = @_; | ||||
| 88 | 0 | $self->{socket} = $socket if $socket; | |||||
| 89 | 0 | return $self->{socket}; | |||||
| 90 | } | ||||||
| 91 | |||||||
| 92 | sub stream_state { | ||||||
| 93 | 0 | 0 | my($self, $stream_state) = @_; | ||||
| 94 | 0 | $self->{stream_state} = $stream_state if $stream_state; | |||||
| 95 | 0 | return $self->{stream_state}; | |||||
| 96 | } | ||||||
| 97 | |||||||
| 98 | sub xml_state { | ||||||
| 99 | 0 | 0 | my($self, $xml_state) = @_; | ||||
| 100 | 0 | $self->{xml_state} = $xml_state if $xml_state; | |||||
| 101 | 0 | return $self->{xml_state}; | |||||
| 102 | } | ||||||
| 103 | |||||||
| 104 | sub message { | ||||||
| 105 | 0 | 0 | my($self, $message) = @_; | ||||
| 106 | 0 | $self->{message} = $message if $message; | |||||
| 107 | 0 | return $self->{message}; | |||||
| 108 | } | ||||||
| 109 | |||||||
| 110 | |||||||
| 111 | # ----------------------------------------------------------- | ||||||
| 112 | # Stream and connection handling methods | ||||||
| 113 | # ----------------------------------------------------------- | ||||||
| 114 | |||||||
| 115 | sub connect { | ||||||
| 116 | 0 | 0 | my($self, $domain, $username, $password, $resource) = @_; | ||||
| 117 | |||||||
| 118 | 0 | $self->send(sprintf(JABBER_CONNECT, $domain)); | |||||
| 119 | 0 | $self->wait(10); | |||||
| 120 | |||||||
| 121 | 0 | unless($self->{stream_state} == CONNECT_RECV) { | |||||
| 122 | 0 | $logger->error("No initial XMPP response from server"); | |||||
| 123 | 0 | return 0; | |||||
| 124 | } | ||||||
| 125 | |||||||
| 126 | 0 | $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource)); | |||||
| 127 | 0 | $self->wait(10); | |||||
| 128 | |||||||
| 129 | 0 | unless($self->connected) { | |||||
| 130 | 0 | $logger->error('XMPP connect failed'); | |||||
| 131 | 0 | return 0; | |||||
| 132 | } | ||||||
| 133 | |||||||
| 134 | 0 | return 1; | |||||
| 135 | } | ||||||
| 136 | |||||||
| 137 | sub disconnect { | ||||||
| 138 | 0 | 0 | my $self = shift; | ||||
| 139 | 0 | return unless $self->socket; | |||||
| 140 | 0 | if($self->tcp_connected) { | |||||
| 141 | 0 | $self->send(JABBER_DISCONNECT); | |||||
| 142 | 0 | shutdown($self->socket, 2); | |||||
| 143 | } | ||||||
| 144 | 0 | close($self->socket); | |||||
| 145 | } | ||||||
| 146 | |||||||
| 147 | # ----------------------------------------------------------- | ||||||
| 148 | # returns true if this stream is connected to the server | ||||||
| 149 | # ----------------------------------------------------------- | ||||||
| 150 | sub connected { | ||||||
| 151 | 0 | 0 | my $self = shift; | ||||
| 152 | 0 | return ($self->tcp_connected and $self->{stream_state} == CONNECTED); | |||||
| 153 | } | ||||||
| 154 | |||||||
| 155 | # ----------------------------------------------------------- | ||||||
| 156 | # returns true if the socket is connected | ||||||
| 157 | # ----------------------------------------------------------- | ||||||
| 158 | sub tcp_connected { | ||||||
| 159 | 0 | 0 | my $self = shift; | ||||
| 160 | 0 | return ($self->socket and $self->socket->connected); | |||||
| 161 | } | ||||||
| 162 | |||||||
| 163 | # ----------------------------------------------------------- | ||||||
| 164 | # sends pre-formated XML | ||||||
| 165 | # ----------------------------------------------------------- | ||||||
| 166 | sub send { | ||||||
| 167 | 0 | 0 | my($self, $xml) = @_; | ||||
| 168 | 0 | $self->{socket}->print($xml); | |||||
| 169 | } | ||||||
| 170 | |||||||
| 171 | # ----------------------------------------------------------- | ||||||
| 172 | # Puts a file handle into blocking mode | ||||||
| 173 | # ----------------------------------------------------------- | ||||||
| 174 | sub set_block { | ||||||
| 175 | 0 | 0 | my $fh = shift; | ||||
| 176 | 0 | my $flags = fcntl($fh, F_GETFL, 0); | |||||
| 177 | 0 | $flags &= ~O_NONBLOCK; | |||||
| 178 | 0 | fcntl($fh, F_SETFL, $flags); | |||||
| 179 | } | ||||||
| 180 | |||||||
| 181 | |||||||
| 182 | # ----------------------------------------------------------- | ||||||
| 183 | # Puts a file handle into non-blocking mode | ||||||
| 184 | # ----------------------------------------------------------- | ||||||
| 185 | sub set_nonblock { | ||||||
| 186 | 0 | 0 | my $fh = shift; | ||||
| 187 | 0 | my $flags = fcntl($fh, F_GETFL, 0); | |||||
| 188 | 0 | fcntl($fh, F_SETFL, $flags | O_NONBLOCK); | |||||
| 189 | } | ||||||
| 190 | |||||||
| 191 | |||||||
| 192 | sub wait { | ||||||
| 193 | 0 | 0 | my($self, $timeout) = @_; | ||||
| 194 | |||||||
| 195 | 0 | return $self->next_msg if $self->peek_msg; | |||||
| 196 | |||||||
| 197 | 0 | $timeout ||= 0; | |||||
| 198 | 0 | $timeout = undef if $timeout < 0; | |||||
| 199 | 0 | my $socket = $self->{socket}; | |||||
| 200 | |||||||
| 201 | 0 | set_block($socket); | |||||
| 202 | |||||||
| 203 | # build the select readset | ||||||
| 204 | 0 | my $infile = ''; | |||||
| 205 | 0 | vec($infile, $socket->fileno, 1) = 1; | |||||
| 206 | 0 | return undef unless select($infile, undef, undef, $timeout); | |||||
| 207 | |||||||
| 208 | # now slurp the data off the socket | ||||||
| 209 | 0 | my $buf; | |||||
| 210 | 0 | my $read_size = 1024; | |||||
| 211 | 0 | my $nonblock = 0; | |||||
| 212 | 0 | while(my $n = sysread($socket, $buf, $read_size)) { | |||||
| 213 | 0 | $self->{parser}->parse_more($buf) if $buf; | |||||
| 214 | 0 | if($n < $read_size or $self->peek_msg) { | |||||
| 215 | 0 | set_block($socket) if $nonblock; | |||||
| 216 | 0 | last; | |||||
| 217 | } | ||||||
| 218 | 0 | set_nonblock($socket) unless $nonblock; | |||||
| 219 | 0 | $nonblock = 1; | |||||
| 220 | } | ||||||
| 221 | |||||||
| 222 | 0 | return $self->next_msg; | |||||
| 223 | } | ||||||
| 224 | |||||||
| 225 | # ----------------------------------------------------------- | ||||||
| 226 | # Waits up to timeout seconds for a fully-formed XMPP | ||||||
| 227 | # message to arrive. If timeout is < 0, waits indefinitely | ||||||
| 228 | # ----------------------------------------------------------- | ||||||
| 229 | sub wait_msg { | ||||||
| 230 | 0 | 0 | my($self, $timeout) = @_; | ||||
| 231 | 0 | my $xml; | |||||
| 232 | |||||||
| 233 | 0 | $timeout = 0 unless defined $timeout; | |||||
| 234 | |||||||
| 235 | 0 | if($timeout < 0) { | |||||
| 236 | 0 | while(1) { | |||||
| 237 | 0 | return $xml if $xml = $self->wait($timeout); | |||||
| 238 | } | ||||||
| 239 | |||||||
| 240 | } else { | ||||||
| 241 | 0 | while($timeout >= 0) { | |||||
| 242 | 0 | my $start = time; | |||||
| 243 | 0 | return $xml if $xml = $self->wait($timeout); | |||||
| 244 | 0 | $timeout -= time - $start; | |||||
| 245 | } | ||||||
| 246 | } | ||||||
| 247 | |||||||
| 248 | 0 | return undef; | |||||
| 249 | } | ||||||
| 250 | |||||||
| 251 | |||||||
| 252 | # ----------------------------------------------------------- | ||||||
| 253 | # SAX Handlers | ||||||
| 254 | # ----------------------------------------------------------- | ||||||
| 255 | |||||||
| 256 | |||||||
| 257 | sub start_element { | ||||||
| 258 | 0 | 0 | my($parser, $name, %attrs) = @_; | ||||
| 259 | 0 | my $self = $parser->{_parent_}; | |||||
| 260 | |||||||
| 261 | 0 | if($name eq 'message') { | |||||
| 262 | |||||||
| 263 | 0 | my $msg = $self->{message}; | |||||
| 264 | 0 | $msg->{to} = $attrs{'to'}; | |||||
| 265 | 0 | $msg->{from} = $attrs{router_from} if $attrs{router_from}; | |||||
| 266 | 0 | $msg->{from} = $attrs{from} unless $msg->{from}; | |||||
| 267 | 0 | $msg->{osrf_xid} = $attrs{'osrf_xid'}; | |||||
| 268 | 0 | $msg->{type} = $attrs{type}; | |||||
| 269 | |||||||
| 270 | } elsif($name eq 'body') { | ||||||
| 271 | 0 | $self->{xml_state} = IN_BODY; | |||||
| 272 | |||||||
| 273 | } elsif($name eq 'thread') { | ||||||
| 274 | 0 | $self->{xml_state} = IN_THREAD; | |||||
| 275 | |||||||
| 276 | } elsif($name eq 'stream:stream') { | ||||||
| 277 | 0 | $self->{stream_state} = CONNECT_RECV; | |||||
| 278 | |||||||
| 279 | } elsif($name eq 'iq') { | ||||||
| 280 | 0 | if($attrs{type} and $attrs{type} eq 'result') { | |||||
| 281 | 0 | $self->{stream_state} = CONNECTED; | |||||
| 282 | } | ||||||
| 283 | |||||||
| 284 | } elsif($name eq 'status') { | ||||||
| 285 | 0 | $self->{xml_state } = IN_STATUS; | |||||
| 286 | |||||||
| 287 | } elsif($name eq 'stream:error') { | ||||||
| 288 | 0 | $self->{stream_state} = DISCONNECTED; | |||||
| 289 | |||||||
| 290 | } elsif($name eq 'error') { | ||||||
| 291 | 0 | $self->{message}->{err_type} = $attrs{'type'}; | |||||
| 292 | 0 | $self->{message}->{err_code} = $attrs{'code'}; | |||||
| 293 | } | ||||||
| 294 | } | ||||||
| 295 | |||||||
| 296 | sub characters { | ||||||
| 297 | 0 | 0 | my($parser, $chars) = @_; | ||||
| 298 | 0 | my $self = $parser->{_parent_}; | |||||
| 299 | 0 | my $state = $self->{xml_state}; | |||||
| 300 | |||||||
| 301 | 0 | if($state == IN_BODY) { | |||||
| 302 | 0 | $self->{message}->{body} .= $chars; | |||||
| 303 | |||||||
| 304 | } elsif($state == IN_THREAD) { | ||||||
| 305 | 0 | $self->{message}->{thread} .= $chars; | |||||
| 306 | |||||||
| 307 | } elsif($state == IN_STATUS) { | ||||||
| 308 | 0 | $self->{message}->{status} .= $chars; | |||||
| 309 | } | ||||||
| 310 | } | ||||||
| 311 | |||||||
| 312 | sub end_element { | ||||||
| 313 | 0 | 0 | my($parser, $name) = @_; | ||||
| 314 | 0 | my $self = $parser->{_parent_}; | |||||
| 315 | 0 | $self->{xml_state} = IN_NOTHING; | |||||
| 316 | |||||||
| 317 | 0 | if($name eq 'message') { | |||||
| 318 | 0 | $self->push_msg($self->{message}); | |||||
| 319 | 0 | $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; | |||||
| 320 | |||||||
| 321 | } elsif($name eq 'stream:stream') { | ||||||
| 322 | 0 | $self->{stream_state} = DISCONNECTED; | |||||
| 323 | } | ||||||
| 324 | } | ||||||
| 325 | |||||||
| 326 | |||||||
| 327 | # read all the data on the jabber socket through the | ||||||
| 328 | # parser and drop the resulting message | ||||||
| 329 | sub flush_socket { | ||||||
| 330 | 0 | 0 | my $self = shift; | ||||
| 331 | 0 | return 0 unless $self->connected; | |||||
| 332 | |||||||
| 333 | 0 | while ($self->wait(0)) { | |||||
| 334 | # TODO remove this log line | ||||||
| 335 | 0 | $logger->info("flushing data from socket..."); | |||||
| 336 | } | ||||||
| 337 | |||||||
| 338 | 0 | return $self->connected; | |||||
| 339 | } | ||||||
| 340 | |||||||
| 341 | |||||||
| 342 | |||||||
| 343 | 1; | ||||||
| 344 | |||||||