| File: | blib/lib/OpenSRF/MultiSession.pm |
| Coverage: | 4.2% |
| line | stmt | bran | cond | sub | pod | time | code |
|---|---|---|---|---|---|---|---|
| 1 | package OpenSRF::MultiSession; | ||||||
| 2 | 1 1 1 | 9 4 10 | use OpenSRF::AppSession; | ||||
| 3 | 1 1 1 | 9 3 7 | use OpenSRF::Utils::Logger; | ||||
| 4 | 1 1 1 | 7 3 37 | use Time::HiRes qw/time usleep/; | ||||
| 5 | |||||||
| 6 | my $log = 'OpenSRF::Utils::Logger'; | ||||||
| 7 | |||||||
| 8 | sub new { | ||||||
| 9 | 0 | 0 | my $class = shift; | ||||
| 10 | 0 | $class = ref($class) || $class; | |||||
| 11 | |||||||
| 12 | 0 | my $self = bless {@_} => $class; | |||||
| 13 | |||||||
| 14 | 0 | $self->{api_level} = 1 if (!defined($self->{api_level})); | |||||
| 15 | 0 | $self->{session_hash_function} = \&_dummy_session_hash_function | |||||
| 16 | if (!defined($self->{session_hash_function})); | ||||||
| 17 | |||||||
| 18 | 0 | if ($self->{cap}) { | |||||
| 19 | 0 | $self->session_cap($self->{cap}) if (!$self->session_cap); | |||||
| 20 | 0 | $self->request_cap($self->{cap}) if (!$self->request_cap); | |||||
| 21 | } | ||||||
| 22 | |||||||
| 23 | 0 | if (!$self->session_cap) { | |||||
| 24 | # XXX make adaptive the default once the logic is in place | ||||||
| 25 | #$self->adaptive(1); | ||||||
| 26 | |||||||
| 27 | 0 | $self->session_cap(10); | |||||
| 28 | } | ||||||
| 29 | 0 | if (!$self->request_cap) { | |||||
| 30 | # XXX make adaptive the default once the logic is in place | ||||||
| 31 | #$self->adaptive(1); | ||||||
| 32 | |||||||
| 33 | 0 | $self->request_cap(10); | |||||
| 34 | } | ||||||
| 35 | |||||||
| 36 | 0 | $self->{sessions} = []; | |||||
| 37 | 0 | $self->{running} = []; | |||||
| 38 | 0 | $self->{completed} = []; | |||||
| 39 | 0 | $self->{failed} = []; | |||||
| 40 | |||||||
| 41 | 0 | for ( 1 .. $self->session_cap) { | |||||
| 42 | 0 0 | push @{ $self->{sessions} }, | |||||
| 43 | OpenSRF::AppSession->create( | ||||||
| 44 | $self->{app}, | ||||||
| 45 | $self->{api_level}, | ||||||
| 46 | 1 | ||||||
| 47 | ); | ||||||
| 48 | #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n"; | ||||||
| 49 | 0 | $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ..."); | |||||
| 50 | } | ||||||
| 51 | |||||||
| 52 | 0 | return $self; | |||||
| 53 | } | ||||||
| 54 | |||||||
| 55 | sub _dummy_session_hash_function { | ||||||
| 56 | 0 | my $self = shift; | |||||
| 57 | 0 | $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter})); | |||||
| 58 | 0 | return $self->{_dummy_hash_counter}++; | |||||
| 59 | } | ||||||
| 60 | |||||||
| 61 | sub connect { | ||||||
| 62 | 0 | 0 | my $self = shift; | ||||
| 63 | 0 0 | for my $ses (@{$self->{sessions}}) { | |||||
| 64 | 0 | $ses->connect unless ($ses->connected); | |||||
| 65 | } | ||||||
| 66 | } | ||||||
| 67 | |||||||
| 68 | sub finish { | ||||||
| 69 | 0 | 0 | my $self = shift; | ||||
| 70 | 0 0 0 | $_->finish for (@{$self->{sessions}}); | |||||
| 71 | } | ||||||
| 72 | |||||||
| 73 | sub disconnect { | ||||||
| 74 | 0 | 0 | my $self = shift; | ||||
| 75 | 0 0 0 | $_->disconnect for (@{$self->{sessions}}); | |||||
| 76 | } | ||||||
| 77 | |||||||
| 78 | sub session_hash_function { | ||||||
| 79 | 0 | 0 | my $self = shift; | ||||
| 80 | 0 | my $session_hash_function = shift; | |||||
| 81 | 0 | return unless (ref $self); | |||||
| 82 | |||||||
| 83 | 0 | $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function); | |||||
| 84 | 0 | return $self->{session_hash_function}; | |||||
| 85 | } | ||||||
| 86 | |||||||
| 87 | sub failure_handler { | ||||||
| 88 | 0 | 0 | my $self = shift; | ||||
| 89 | 0 | my $failure_handler = shift; | |||||
| 90 | 0 | return unless (ref $self); | |||||
| 91 | |||||||
| 92 | 0 | $self->{failure_handler} = $failure_handler if (defined $failure_handler); | |||||
| 93 | 0 | return $self->{failure_handler}; | |||||
| 94 | } | ||||||
| 95 | |||||||
| 96 | sub success_handler { | ||||||
| 97 | 0 | 0 | my $self = shift; | ||||
| 98 | 0 | my $success_handler = shift; | |||||
| 99 | 0 | return unless (ref $self); | |||||
| 100 | |||||||
| 101 | 0 | $self->{success_handler} = $success_handler if (defined $success_handler); | |||||
| 102 | 0 | return $self->{success_handler}; | |||||
| 103 | } | ||||||
| 104 | |||||||
| 105 | sub session_cap { | ||||||
| 106 | 0 | 0 | my $self = shift; | ||||
| 107 | 0 | my $cap = shift; | |||||
| 108 | 0 | return unless (ref $self); | |||||
| 109 | |||||||
| 110 | 0 | $self->{session_cap} = $cap if (defined $cap); | |||||
| 111 | 0 | return $self->{session_cap}; | |||||
| 112 | } | ||||||
| 113 | |||||||
| 114 | sub request_cap { | ||||||
| 115 | 0 | 0 | my $self = shift; | ||||
| 116 | 0 | my $cap = shift; | |||||
| 117 | 0 | return unless (ref $self); | |||||
| 118 | |||||||
| 119 | 0 | $self->{request_cap} = $cap if (defined $cap); | |||||
| 120 | 0 | return $self->{request_cap}; | |||||
| 121 | } | ||||||
| 122 | |||||||
| 123 | sub adaptive { | ||||||
| 124 | 0 | 0 | my $self = shift; | ||||
| 125 | 0 | my $adapt = shift; | |||||
| 126 | 0 | return unless (ref $self); | |||||
| 127 | |||||||
| 128 | 0 | $self->{adaptive} = $adapt if (defined $adapt); | |||||
| 129 | 0 | return $self->{adaptive}; | |||||
| 130 | } | ||||||
| 131 | |||||||
| 132 | sub completed { | ||||||
| 133 | 0 | 0 | my $self = shift; | ||||
| 134 | 0 | my $count = shift; | |||||
| 135 | 0 | return unless (ref $self); | |||||
| 136 | |||||||
| 137 | |||||||
| 138 | 0 | if (wantarray) { | |||||
| 139 | 0 0 | $count ||= scalar @{$self->{completed}}; | |||||
| 140 | } | ||||||
| 141 | |||||||
| 142 | 0 | if (defined $count) { | |||||
| 143 | 0 0 | return () unless (@{$self->{completed}}); | |||||
| 144 | 0 0 | return splice @{$self->{completed}}, 0, $count; | |||||
| 145 | } | ||||||
| 146 | |||||||
| 147 | 0 0 | return scalar @{$self->{completed}}; | |||||
| 148 | } | ||||||
| 149 | |||||||
| 150 | sub failed { | ||||||
| 151 | 0 | 0 | my $self = shift; | ||||
| 152 | 0 | my $count = shift; | |||||
| 153 | 0 | return unless (ref $self); | |||||
| 154 | |||||||
| 155 | |||||||
| 156 | 0 | if (wantarray) { | |||||
| 157 | 0 0 | $count ||= scalar @{$self->{failed}}; | |||||
| 158 | } | ||||||
| 159 | |||||||
| 160 | 0 | if (defined $count) { | |||||
| 161 | 0 0 | return () unless (@{$self->{failed}}); | |||||
| 162 | 0 0 | return splice @{$self->{failed}}, 0, $count; | |||||
| 163 | } | ||||||
| 164 | |||||||
| 165 | 0 0 | return scalar @{$self->{failed}}; | |||||
| 166 | } | ||||||
| 167 | |||||||
| 168 | sub running { | ||||||
| 169 | 0 | 0 | my $self = shift; | ||||
| 170 | 0 | return unless (ref $self); | |||||
| 171 | 0 0 | return scalar(@{ $self->{running} }); | |||||
| 172 | } | ||||||
| 173 | |||||||
| 174 | |||||||
| 175 | sub request { | ||||||
| 176 | 0 | 0 | my $self = shift; | ||||
| 177 | 0 | my $hash_param; | |||||
| 178 | |||||||
| 179 | 0 | my $method = shift; | |||||
| 180 | 0 | if (ref $method) { | |||||
| 181 | 0 | $hash_param = $method; | |||||
| 182 | 0 | $method = shift; | |||||
| 183 | } | ||||||
| 184 | |||||||
| 185 | 0 | my @params = @_; | |||||
| 186 | |||||||
| 187 | 0 | $self->session_reap; | |||||
| 188 | 0 | if ($self->running < $self->request_cap ) { | |||||
| 189 | 0 | my $index = $self->session_hash_function->($self, (defined $hash_param ? $hash_param : ()), $method, @params); | |||||
| 190 | 0 | my $ses = $self->{sessions}->[$index % $self->session_cap]; | |||||
| 191 | |||||||
| 192 | #print "Running $method using session ".$ses->session_id."\n"; | ||||||
| 193 | |||||||
| 194 | 0 | my $req = $ses->request( $method, @params ); | |||||
| 195 | |||||||
| 196 | 0 0 | push @{ $self->{running} }, | |||||
| 197 | { req => $req, | ||||||
| 198 | meth => $method, | ||||||
| 199 | hash => $hash_param, | ||||||
| 200 | params => [@params] | ||||||
| 201 | }; | ||||||
| 202 | |||||||
| 203 | 0 | $log->debug("Making request [$method] ".$self->running."..."); | |||||
| 204 | |||||||
| 205 | 0 | return $req; | |||||
| 206 | } elsif (!$self->adaptive) { | ||||||
| 207 | #print "Oops. Too many running: ".$self->running."\n"; | ||||||
| 208 | 0 | $self->session_wait; | |||||
| 209 | 0 | return $self->request((defined $hash_param ? $hash_param : ()), $method => @params); | |||||
| 210 | } else { | ||||||
| 211 | # XXX do addaptive stuff ... | ||||||
| 212 | } | ||||||
| 213 | } | ||||||
| 214 | |||||||
| 215 | sub session_wait { | ||||||
| 216 | 0 | 0 | my $self = shift; | ||||
| 217 | 0 | my $all = shift; | |||||
| 218 | |||||||
| 219 | 0 | my $count; | |||||
| 220 | 0 | if ($all) { | |||||
| 221 | 0 | $count = $self->running; | |||||
| 222 | 0 | while ($self->running) { | |||||
| 223 | 0 | $self->session_reap; | |||||
| 224 | } | ||||||
| 225 | 0 | return $count; | |||||
| 226 | } else { | ||||||
| 227 | 0 | while(($count = $self->session_reap) == 0 && $self->running) { | |||||
| 228 | 0 | usleep 100; | |||||
| 229 | } | ||||||
| 230 | 0 | return $count; | |||||
| 231 | } | ||||||
| 232 | } | ||||||
| 233 | |||||||
| 234 | sub session_reap { | ||||||
| 235 | 0 | 0 | my $self = shift; | ||||
| 236 | |||||||
| 237 | 0 | my @done; | |||||
| 238 | 0 | my @running; | |||||
| 239 | 0 0 | while ( my $req = shift @{ $self->{running} } ) { | |||||
| 240 | 0 | if ($req->{req}->complete) { | |||||
| 241 | #print "Currently running: ".$self->running."\n"; | ||||||
| 242 | |||||||
| 243 | 0 | $req->{response} = [ $req->{req}->recv ]; | |||||
| 244 | 0 | $req->{duration} = $req->{req}->duration; | |||||
| 245 | |||||||
| 246 | #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n"; | ||||||
| 247 | |||||||
| 248 | 0 | if ($req->{req}->failed) { | |||||
| 249 | #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n"; | ||||||
| 250 | 0 | $req->{error} = $req->{req}->failed; | |||||
| 251 | 0 0 | push @{ $self->{failed} }, $req; | |||||
| 252 | } else { | ||||||
| 253 | 0 0 | push @{ $self->{completed} }, $req; | |||||
| 254 | } | ||||||
| 255 | |||||||
| 256 | 0 | push @done, $req; | |||||
| 257 | |||||||
| 258 | } else { | ||||||
| 259 | #$log->debug("Still running ".$req->{meth}." in session ".$req->{req}->session->session_id); | ||||||
| 260 | 0 | push @running, $req; | |||||
| 261 | } | ||||||
| 262 | } | ||||||
| 263 | 0 0 | push @{ $self->{running} }, @running; | |||||
| 264 | |||||||
| 265 | 0 | for my $req ( @done ) { | |||||
| 266 | 0 | my $handler = $req->{error} ? $self->failure_handler : $self->success_handler; | |||||
| 267 | 0 | $handler->($self, $req) if ($handler); | |||||
| 268 | |||||||
| 269 | 0 | $req->{req}->finish; | |||||
| 270 | 0 0 | delete $$req{$_} for (keys %$req); | |||||
| 271 | |||||||
| 272 | } | ||||||
| 273 | |||||||
| 274 | 0 | my $complete = scalar @done; | |||||
| 275 | 0 | my $incomplete = scalar @running; | |||||
| 276 | |||||||
| 277 | #$log->debug("Still running $incomplete, completed $complete"); | ||||||
| 278 | |||||||
| 279 | 0 | return $complete; | |||||
| 280 | } | ||||||
| 281 | |||||||
| 282 | 1; | ||||||
| 283 | |||||||