This patch is generated from the async-calls branch of HEAD in squid3 Tue Apr 22 00:24:12 2008 GMT See http://devel.squid-cache.org/ Index: squid3/src/AsyncCall.h diff -u squid3/src/AsyncCall.h:1.5 squid3/src/AsyncCall.h:1.3.22.24 --- squid3/src/AsyncCall.h:1.5 Tue Feb 26 13:50:51 2008 +++ squid3/src/AsyncCall.h Mon Apr 21 15:32:38 2008 @@ -5,9 +5,9 @@ #ifndef SQUID_ASYNCCALL_H #define SQUID_ASYNCCALL_H -//#include "cbdata.h" +#include "cbdata.h" #include "event.h" -//#include "TextException.h" +#include "TextException.h" /** \defgroup AsynCallsAPI Async-Calls API @@ -29,6 +29,7 @@ * debugging. */ + class CallDialer; class AsyncCallQueue; @@ -90,6 +91,7 @@ return os; } + /** \ingroup AsyncCallAPI * Interface for all async call dialers Index: squid3/src/BodyPipe.cc diff -u squid3/src/BodyPipe.cc:1.9 squid3/src/BodyPipe.cc:1.7.4.13 --- squid3/src/BodyPipe.cc:1.9 Thu Apr 17 14:52:44 2008 +++ squid3/src/BodyPipe.cc Mon Apr 21 15:32:39 2008 @@ -99,7 +99,7 @@ { debugs(91,7, HERE << this << " will not produce for " << pipe << "; atEof: " << atEof); - assert(pipe != NULL); // be strict: the caller state may depend on this + Must(pipe != NULL); // be strict: the caller state may depend on this pipe->clearProducer(atEof); pipe = NULL; } @@ -112,7 +112,7 @@ void BodyConsumer::stopConsumingFrom(RefCount &pipe) { debugs(91,7, HERE << this << " will not consume from " << pipe); - assert(pipe != NULL); // be strict: the caller state may depend on this + Must(pipe != NULL); // be strict: the caller state may depend on this pipe->clearConsumer(); pipe = NULL; } @@ -141,14 +141,14 @@ void BodyPipe::setBodySize(uint64_t aBodySize) { - assert(!bodySizeKnown()); - assert(aBodySize >= 0); - assert(thePutSize <= aBodySize); + Must(!bodySizeKnown()); + Must(aBodySize >= 0); + Must(thePutSize <= aBodySize); // If this assert fails, we need to add code to check for eof and inform // the consumer about the eof condition via scheduleBodyEndNotification, // because just setting a body size limit may trigger the eof condition. - assert(!theConsumer); + Must(!theConsumer); theBodySize = aBodySize; debugs(91,7, HERE << "set body size" << status()); @@ -156,13 +156,13 @@ uint64_t BodyPipe::bodySize() const { - assert(bodySizeKnown()); + Must(bodySizeKnown()); return static_cast(theBodySize); } bool BodyPipe::expectMoreAfter(uint64_t offset) const { - assert(theGetSize <= offset); + Must(theGetSize <= offset); return offset < thePutSize || // buffer has more now or (!productionEnded() && mayNeedMoreData()); // buffer will have more } @@ -191,7 +191,7 @@ debugs(91,3, HERE << "aborting on premature eof" << status()); } else { // asserta that we can detect the abort if the consumer joins later - assert(!bodySizeKnown() || bodySize() != thePutSize); + Must(!bodySizeKnown() || bodySize() != thePutSize); } scheduleBodyEndNotification(); } @@ -215,8 +215,8 @@ bool BodyPipe::setConsumerIfNotLate(Consumer *aConsumer) { - assert(!theConsumer); - assert(aConsumer); + Must(!theConsumer); + Must(aConsumer); // TODO: convert this into an exception and remove IfNotLate suffix // If there is something consumed already, we are in an auto-consuming mode @@ -302,7 +302,7 @@ MemBuf & BodyPipe::checkOut() { - assert(!isCheckedOut); + Must(!isCheckedOut); isCheckedOut = true; return theBuf; } @@ -310,7 +310,7 @@ void BodyPipe::checkIn(Checkout &checkout) { - assert(isCheckedOut); + Must(isCheckedOut); isCheckedOut = false; const size_t currentSize = theBuf.contentSize(); if (checkout.checkedOutSize > currentSize) @@ -323,14 +323,14 @@ void BodyPipe::undoCheckOut(Checkout &checkout) { - assert(isCheckedOut); + Must(isCheckedOut); const size_t currentSize = theBuf.contentSize(); // We can only undo if size did not change, and even that carries // some risk. If this becomes a problem, the code checking out // raw buffers should always check them in (possibly unchanged) // instead of relying on the automated undo mechanism of Checkout. // The code can always use a temporary buffer to accomplish that. - assert(checkout.checkedOutSize == currentSize); + Must(checkout.checkedOutSize == currentSize); } // TODO: Optimize: inform consumer/producer about more data/space only if @@ -338,7 +338,7 @@ void BodyPipe::postConsume(size_t size) { - assert(!isCheckedOut); + Must(!isCheckedOut); theGetSize += size; debugs(91,7, HERE << "consumed " << size << " bytes" << status()); if (mayNeedMoreData()){ @@ -352,7 +352,7 @@ void BodyPipe::postAppend(size_t size) { - assert(!isCheckedOut); + Must(!isCheckedOut); thePutSize += size; debugs(91,7, HERE << "added " << size << " bytes" << status()); @@ -453,7 +453,7 @@ void BodyPipeCheckout::checkIn() { - assert(!checkedIn); + Must(!checkedIn); pipe.checkIn(*this); checkedIn = true; } Index: squid3/src/Debug.h diff -u squid3/src/Debug.h:1.14 squid3/src/Debug.h:1.10.4.7 --- squid3/src/Debug.h:1.14 Thu Mar 20 18:22:17 2008 +++ squid3/src/Debug.h Mon Apr 21 15:32:41 2008 @@ -51,6 +51,11 @@ #define assert(EX) ((EX)?((void)0):xassert("EX", __FILE__, __LINE__)) #endif +void WillCatchException(int debug_section, int debug_level, const char *who); +void WontCatchException(); +extern int TheSalvagedAsserts; +extern int TheAssertsPerStep; + /* defined debug section limits */ #define MAX_DEBUG_SECTIONS 100 Index: squid3/src/ESIInclude.cc diff -u squid3/src/ESIInclude.cc:1.17 squid3/src/ESIInclude.cc:1.14.4.3 --- squid3/src/ESIInclude.cc:1.17 Tue Feb 12 16:50:41 2008 +++ squid3/src/ESIInclude.cc Wed Apr 2 15:48:23 2008 @@ -110,7 +110,8 @@ rep = NULL; esiStream->include->fail (esiStream); esiStream->finished = 1; - httpRequestFree (http); + http->httpRequestFree (); + cbdataInternalUnlock(http); return; } @@ -157,7 +158,8 @@ debugs(86, 5, "Finished reading upstream data in subrequest"); esiStream->include->subRequestDone (esiStream, true); esiStream->finished = 1; - httpRequestFree (http); + http->httpRequestFree (); + cbdataInternalUnlock(http); return; } @@ -181,14 +183,16 @@ debugs(86, 3, "ESI subrequest finished OK"); esiStream->include->subRequestDone (esiStream, true); esiStream->finished = 1; - httpRequestFree (http); + http->httpRequestFree (); + cbdataInternalUnlock(http); return; case STREAM_FAILED: debugs(86, 1, "ESI subrequest failed transfer"); esiStream->include->fail (esiStream); esiStream->finished = 1; - httpRequestFree (http); + http->httpRequestFree (); + cbdataInternalUnlock(http); return; case STREAM_NONE: { Index: squid3/src/cf.data.pre diff -u squid3/src/cf.data.pre:1.178 squid3/src/cf.data.pre:1.155.4.6 --- squid3/src/cf.data.pre:1.178 Sat Apr 12 17:13:46 2008 +++ squid3/src/cf.data.pre Mon Apr 21 15:33:17 2008 @@ -5682,4 +5682,21 @@ rounded to 1000. DOC_END +NAME: assert_burst_max +TYPE: int +LOC: Config.assert_burst_max +DEFAULT: 0 +DOC_START + When this is set to a possitive number then Squid will try to + survive from assertions if possible and will die only if an + assertions burst exceeds this number. + If set to zero (default), the first assertion aborts Squid, + giving users the old behavior. If set to a negative number, + there is no limit. + An asssertions burst defined as the number of assertions per + single Squid main loop iteration. + WARNING! This is an experimental feature and the definition + of a "burst" can change +DOC_END + EOF Index: squid3/src/client_side.cc diff -u squid3/src/client_side.cc:1.154 squid3/src/client_side.cc:1.139.4.13 --- squid3/src/client_side.cc:1.154 Mon Apr 14 17:13:14 2008 +++ squid3/src/client_side.cc Mon Apr 21 15:33:20 2008 @@ -251,8 +251,9 @@ if (connRegistered_) deRegisterWithConn(); - - httpRequestFree(http); + + http->httpRequestFree(); + cbdataReferenceDone(http); /* clean up connection links to us */ assert(this != next.getRaw()); @@ -304,7 +305,7 @@ ClientSocketContext *newContext; assert(http != NULL); newContext = new ClientSocketContext; - newContext->http = http; + newContext->http = cbdataReference(http); return newContext; } @@ -557,13 +558,13 @@ clientStreamAbort((clientStreamNode *)client_stream.tail->data, this); } -void +/*void httpRequestFree(void *data) { ClientHttpRequest *http = (ClientHttpRequest *)data; assert(http != NULL); delete http; -} +}*/ bool ConnStateData::areAllContextsForThisConnection() const @@ -587,17 +588,17 @@ ClientSocketContext::Pointer context; while ((context = getCurrentContext()).getRaw() != NULL) { - assert(getCurrentContext() != + Must(getCurrentContext() != getCurrentContext()->next); context->connIsFinished(); - assert (context != currentobject); + Must(context != currentobject); } } /* This is a handler normally called by comm_close() */ void ConnStateData::connStateClosed(const CommCloseCbParams &io) { - assert (fd == io.fd); + Must (fd == io.fd); close(); } Index: squid3/src/client_side_request.cc diff -u squid3/src/client_side_request.cc:1.93 squid3/src/client_side_request.cc:1.79.4.15 --- squid3/src/client_side_request.cc:1.93 Sat Apr 12 17:13:46 2008 +++ squid3/src/client_side_request.cc Mon Apr 21 15:33:21 2008 @@ -142,7 +142,7 @@ } ClientHttpRequest::ClientHttpRequest(ConnStateData * aConn) : -#if USE_ADAPTATION +#if USE_ADAPTATION || USE_SSL AsyncJob("ClientHttpRequest"), #endif loggingEntry_(NULL) @@ -162,7 +162,7 @@ bool ClientHttpRequest::onlyIfCached()const { - assert(request); + Must(request); return request->cache_control && EBIT_TEST(request->cache_control->mask, CC_ONLY_IF_CACHED); } @@ -242,7 +242,7 @@ /* the ICP check here was erroneous * - StoreEntry::releaseRequest was always called if entry was valid */ - assert(logType < LOG_TYPE_MAX); + Must(logType < LOG_TYPE_MAX); logRequest(); @@ -292,6 +292,8 @@ http->req_sz = 0; tempBuffer.length = taillen; tempBuffer.data = tailbuf; + /*http should be unLocked/released by the caller module (ESI module for example)*/ + cbdataInternalLock(http); /* client stream setup */ clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach, clientReplyStatus, new clientReplyContext(http), streamcallback, @@ -904,7 +906,7 @@ debugs(85, 4, "ClientHttpRequest::httpStart: " << log_tags[logType] << " for '" << uri << "'"); /* no one should have touched this */ - assert(out.offset == 0); + Must(out.offset == 0); /* Use the Stream Luke */ clientStreamNode *node = (clientStreamNode *)client_stream.tail->data; clientStreamRead(node, this, node->readBuffer); @@ -931,6 +933,7 @@ } // called when comm_write has completed +/* static void SslBumpEstablish(int, char *, size_t, comm_err_t errflag, int, void *data) { @@ -940,15 +943,15 @@ assert(r && cbdataReferenceValid(r)); r->sslBumpEstablish(errflag); } - +*/ void -ClientHttpRequest::sslBumpEstablish(comm_err_t errflag) +ClientHttpRequest::sslBumpEstablish(const CommIoCbParams &io) { // Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up - if (errflag == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - if (errflag) { + if (io.flag) { getConn()->startClosing("CONNECT response failure in SslBump"); return; } @@ -968,8 +971,11 @@ // TODO: Unify with tunnel.cc and add a Server(?) header static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n"; - comm_write(fd, conn_established, strlen(conn_established), - &SslBumpEstablish, this, NULL); + + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &ClientHttpRequest::sslBumpEstablish); + AsyncCall::Pointer call = asyncCall(85, 5, "ClientHttpRequest::sslBumpEstablish", dialer); + comm_write(fd, conn_established, strlen(conn_established), call); } #endif @@ -980,7 +986,7 @@ /** TODO: should be querying the stream. */ int64_t contentLength = memObject()->getReply()->bodySize(request->method); - assert(contentLength >= 0); + Must(contentLength >= 0); if (out.offset < contentLength) return false; @@ -1043,7 +1049,7 @@ void ClientHttpRequest::doCallouts() { - assert(calloutContext); + Must(calloutContext); if (!calloutContext->http_access_done) { debugs(83, 3, HERE << "Doing calloutContext->clientAccessCheck()"); @@ -1064,7 +1070,7 @@ if (!calloutContext->redirect_done) { calloutContext->redirect_done = true; - assert(calloutContext->redirect_state == REDIRECT_NONE); + Must(calloutContext->redirect_state == REDIRECT_NONE); if (Config.Program.redirect) { debugs(83, 3, HERE << "Doing calloutContext->clientRedirectStart()"); @@ -1137,8 +1143,8 @@ return false; } - assert(!virginHeadSource); - assert(!adaptedBodySource); + Must(!virginHeadSource); + Must(!adaptedBodySource); virginHeadSource = initiateAdaptation(service->makeXactLauncher( this, request, NULL)); @@ -1148,8 +1154,8 @@ void ClientHttpRequest::noteAdaptationAnswer(HttpMsg *msg) { - assert(cbdataReferenceValid(this)); // indicates bug - assert(msg); + Must(cbdataReferenceValid(this)); // indicates bug + Must(msg); if (HttpRequest *new_req = dynamic_cast(msg)) { /* @@ -1163,14 +1169,14 @@ xfree(uri); uri = xstrdup(urlCanonical(request)); setLogUri(this, urlCanonicalClean(request)); - assert(request->method.id()); + Must(request->method.id()); } else if (HttpReply *new_rep = dynamic_cast(msg)) { debugs(85,3,HERE << "REQMOD reply is HTTP reply"); // subscribe to receive reply body if (new_rep->body_pipe != NULL) { adaptedBodySource = new_rep->body_pipe; - assert(adaptedBodySource->setConsumerIfNotLate(this)); + Must(adaptedBodySource->setConsumerIfNotLate(this)); } clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data; @@ -1198,15 +1204,15 @@ ClientHttpRequest::noteAdaptationQueryAbort(bool final) { clearAdaptation(virginHeadSource); - assert(!adaptedBodySource); + Must(!adaptedBodySource); handleAdaptationFailure(!final); } void ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer) { - assert(request_satisfaction_mode); - assert(adaptedBodySource != NULL); + Must(request_satisfaction_mode); + Must(adaptedBodySource != NULL); if (const size_t contentSize = adaptedBodySource->buf().contentSize()) { BodyPipeCheckout bpc(*adaptedBodySource); @@ -1226,12 +1232,12 @@ void ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer) { - assert(!virginHeadSource); + Must(!virginHeadSource); if (adaptedBodySource != NULL) { // did not end request satisfaction yet // We do not expect more because noteMoreBodyDataAvailable always // consumes everything. We do not even have a mechanism to consume // leftovers after noteMoreBodyDataAvailable notifications seize. - assert(adaptedBodySource->exhausted()); + Must(adaptedBodySource->exhausted()); endRequestSatisfaction(); } } @@ -1239,7 +1245,7 @@ void ClientHttpRequest::endRequestSatisfaction() { debugs(85,4, HERE << this << " ends request satisfaction"); - assert(request_satisfaction_mode); + Must(request_satisfaction_mode); stopConsumingFrom(adaptedBodySource); // TODO: anything else needed to end store entry formation correctly? @@ -1249,7 +1255,7 @@ void ClientHttpRequest::noteBodyProducerAborted(BodyPipe::Pointer) { - assert(!virginHeadSource); + Must(!virginHeadSource); stopConsumingFrom(adaptedBodySource); handleAdaptationFailure(); } @@ -1274,7 +1280,7 @@ clientStreamNode *node = (clientStreamNode *)client_stream.tail->prev->data; clientReplyContext *repContext = dynamic_cast(node->data.getRaw()); - assert(repContext); + Must(repContext); // The original author of the code also wanted to pass an errno to // setReplyToError, but it seems unlikely that the errno reflects the Index: squid3/src/client_side_request.h diff -u squid3/src/client_side_request.h:1.37 squid3/src/client_side_request.h:1.30.4.12 --- squid3/src/client_side_request.h:1.37 Sat Apr 12 17:13:46 2008 +++ squid3/src/client_side_request.h Mon Apr 21 15:33:22 2008 @@ -63,13 +63,15 @@ #if USE_ADAPTATION : public Adaptation::Initiator, // to start adaptation transactions public BodyConsumer // to receive reply bodies in request satisf. mode +#elif USE_SSL + : virtual public AsyncJob #endif { public: void *operator new (size_t); void operator delete (void *); -#if USE_ADAPTATION +#if USE_ADAPTATION || USE_SSL void *toCbdata() { return this; } #endif ClientHttpRequest(ConnStateData *); @@ -136,10 +138,13 @@ ClientRequestContext *calloutContext; void doCallouts(); -#if USE_ADAPTATION +#if USE_ADAPTATION || USE_SSL // AsyncJob virtual methods virtual bool doneAll() const { return Initiator::doneAll() && BodyConsumer::doneAll() && false;} + void httpRequestFree() { deleteThis("httpRequestFree"); } +#else + void httpRequestFree() { delete this; } #endif private: @@ -153,7 +158,7 @@ public: bool sslBumpNeeded() const; void sslBumpStart(); - void sslBumpEstablish(comm_err_t errflag); + void sslBumpEstablish(const CommIoCbParams &io); #endif #if USE_ADAPTATION Index: squid3/src/debug.cc diff -u squid3/src/debug.cc:1.28 squid3/src/debug.cc:1.18.4.8 --- squid3/src/debug.cc:1.28 Mon Apr 14 17:13:14 2008 +++ squid3/src/debug.cc Mon Apr 21 15:33:23 2008 @@ -37,6 +37,7 @@ #include "Debug.h" #include "SquidTime.h" +#include "TextException.h" /* for Config */ #include "structs.h" @@ -44,6 +45,11 @@ int Debug::Levels[MAX_DEBUG_SECTIONS]; int Debug::level; +static bool AsyncCall_Handling_Exceptions = 0; +int TheCascadingAsserts = 0; +int TheSalvagedAsserts = 0; +int TheAssertsPerStep = 0; + static char *debug_log_file = NULL; static int Ctx_Lock = 0; static const char *debugLogTime(void); @@ -569,9 +575,48 @@ return buf; } +void WillCatchException(int debug_section, int debug_level, const char *who){ + if(AsyncCall_Handling_Exceptions) { + debugs(0, 0, "AsyncCall handling exceptions already enabled! The last caller is:" << who ); + abort(); + } + debugs(debug_section, debug_level, "The " << who << " will handle exceptions"); + AsyncCall_Handling_Exceptions = true; +} + +void WontCatchException(){ + AsyncCall_Handling_Exceptions = false; + TheCascadingAsserts = 0; +} + +#define MAX_CASCADING_ASSERTS 10 void xassert(const char *msg, const char *file, int line) { - debugs(0, 0, "assertion failed: " << file << ":" << line << ": \"" << msg << "\""); + + debugs(0, 0, "assertion failed: " << file << ":" << line << ": \"" << msg << "\""); + + if (AsyncCall_Handling_Exceptions && + TheCascadingAsserts < MAX_CASCADING_ASSERTS && + ( Config.assert_burst_max < 0 || + (Config.assert_burst_max > 0 && TheAssertsPerStep < Config.assert_burst_max) + ) + ) { + TheCascadingAsserts++; + TheSalvagedAsserts++; + TheAssertsPerStep++; + + debugs(0, 0, "salvaging assertion #" << TheSalvagedAsserts << " (" << + TheAssertsPerStep << "/" << Config.assert_burst_max << ")"); + + throw TextException(msg, file, line); + } + + + if(TheCascadingAsserts >= MAX_CASCADING_ASSERTS) + debugs(0, 0, "dying after " << TheCascadingAsserts << "cascading assertions" ); + + if(Config.assert_burst_max > 0 && TheAssertsPerStep >= Config.assert_burst_max) + debugs(0, 0, "dying after an " << TheAssertsPerStep << " assertions burst" ); if (!shutting_down) abort(); Index: squid3/src/event.cc diff -u squid3/src/event.cc:1.19 squid3/src/event.cc:1.16.4.5 --- squid3/src/event.cc:1.19 Tue Feb 12 22:51:18 2008 +++ squid3/src/event.cc Mon Apr 21 15:33:24 2008 @@ -43,6 +43,7 @@ static OBJH eventDump; static const char *last_event_ran = NULL; + // This AsyncCall dialer can be configured to check that the event cbdata is // valid before calling the event handler class EventDialer: public CallDialer Index: squid3/src/http.cc diff -u squid3/src/http.cc:1.133 squid3/src/http.cc:1.122.4.13 --- squid3/src/http.cc:1.133 Mon Apr 14 17:13:14 2008 +++ squid3/src/http.cc Mon Apr 21 15:33:28 2008 @@ -162,14 +162,6 @@ { return fd; } -/* -static void -httpStateFree(int fd, void *data) -{ - HttpStateData *httpState = static_cast(data); - debugs(11, 5, "httpStateFree: FD " << fd << ", httpState=" << data); - delete httpState; -}*/ void HttpStateData::httpStateConnClosed(const CommCloseCbParams ¶ms) @@ -265,7 +257,7 @@ if (!remove && !forbidden) return; - assert(e->mem_obj); + Must(e->mem_obj); if (e->mem_obj->request) pe = storeGetPublicByRequest(e->mem_obj->request); @@ -273,7 +265,7 @@ pe = storeGetPublic(e->mem_obj->url, e->mem_obj->method); if (pe != NULL) { - assert(e != pe); + Must(e != pe); pe->release(); } @@ -287,7 +279,7 @@ pe = storeGetPublic(e->mem_obj->url, METHOD_HEAD); if (pe != NULL) { - assert(e != pe); + Must(e != pe); pe->release(); } @@ -322,7 +314,7 @@ pe = storeGetPublic(e->mem_obj->url, METHOD_GET); if (pe != NULL) { - assert(e != pe); + Must(e != pe); pe->release(); } @@ -704,7 +696,7 @@ Ctx ctx = ctx_enter(entry->mem_obj->url); debugs(11, 3, "processReplyHeader: key '" << entry->getMD5Text() << "'"); - assert(!flags.headers_parsed); + Must(!flags.headers_parsed); http_status error = HTTP_STATUS_NONE; @@ -735,8 +727,8 @@ } if (!parsed) { // need more data - assert(!error); - assert(!eof); + Must(!error); + Must(!eof); delete newrep; ctx_exit(ctx); return; @@ -952,18 +944,6 @@ /* * This is the callback after some data has been read from the network */ -/* -void -HttpStateData::ReadReplyWrapper(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) -{ - HttpStateData *httpState = static_cast(data); - assert (fd == httpState->fd); - // assert(buf == readBuf->content()); - PROF_start(HttpStateData_readReply); - httpState->readReply (len, flag, xerrno); - PROF_stop(HttpStateData_readReply); -} -*/ /* XXX this function is too long! */ void HttpStateData::readReply (const CommIoCbParams &io) @@ -972,7 +952,9 @@ int clen; int len = io.size; - assert(fd == io.fd); + PROF_start(HttpStateData_readReply); + + Must(fd == io.fd); flags.do_next_read = 0; @@ -981,11 +963,13 @@ // Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us if (io.flag == COMM_ERR_CLOSING) { debugs(11, 3, "http socket closing"); + PROF_stop(HttpStateData_readReply); return; } if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { maybeReadVirginBody(); + PROF_stop(HttpStateData_readReply); return; } @@ -1003,7 +987,8 @@ flags.do_next_read = 0; comm_close(fd); } - + + PROF_stop(HttpStateData_readReply); return; } @@ -1044,6 +1029,7 @@ /* Timeout NOT increased. This whitespace was from previous reply */ flags.do_next_read = 1; maybeReadVirginBody(); + PROF_stop(HttpStateData_readReply); return; } } @@ -1060,8 +1046,10 @@ processReplyHeader(); PROF_stop(HttpStateData_processReplyHeader); - if (!continueAfterParsingHeader()) // parsing error or need more data + if (!continueAfterParsingHeader()) { // parsing error or need more data + PROF_stop(HttpStateData_readReply); return; // TODO: send errors to ICAP + } adaptOrFinalizeReply(); } @@ -1070,6 +1058,7 @@ PROF_start(HttpStateData_processReplyBody); processReplyBody(); // may call serverComplete() PROF_stop(HttpStateData_processReplyBody); + PROF_stop(HttpStateData_readReply); } // Checks whether we can continue with processing the body or doing ICAP. @@ -1107,12 +1096,12 @@ error = ERR_INVALID_RESP; } } else { - assert(eof); + Must(eof); error = readBuf->hasContent() ? ERR_INVALID_RESP : ERR_ZERO_SIZE_OBJECT; } - assert(error != ERR_NONE); + Must(error != ERR_NONE); entry->reset(); fwd->fail(errorCon(error, HTTP_BAD_GATEWAY, fwd->request)); flags.do_next_read = 0; @@ -1139,8 +1128,8 @@ const char *data = NULL; int len; bool status = false; - assert(flags.chunked); - assert(httpChunkDecoder); + Must(flags.chunked); + Must(httpChunkDecoder); SQUID_ENTER_THROWING_CODE(); MemBuf decodedData; decodedData.init(); @@ -1379,7 +1368,7 @@ const HttpHeaderEntry *e; String strFwd; HttpHeaderPos pos = HttpHeaderInitPos; - assert (hdr_out->owner == hoRequest); + Must (hdr_out->owner == hoRequest); /* append our IMS header */ if (request->lastmod > -1) @@ -1547,7 +1536,7 @@ httpHdrCcSetMaxAge(cc, getMaxAge(url)); if (request->urlpath.size()) - assert(strstr(url, request->urlpath.buf())); + Must(strstr(url, request->urlpath.buf())); } /* Set no-cache if determined needed but not found */ @@ -1789,7 +1778,7 @@ Dialer dialer(this, &HttpStateData::sentRequestBody); requestSender = asyncCall(11,5, "HttpStateData::sentRequestBody", dialer); } else { - assert(!requestBodySource); + Must(!requestBodySource); typedef CommCbMemFunT Dialer; Dialer dialer(this, &HttpStateData::sendComplete); requestSender = asyncCall(11,5, "HttpStateData::SendComplete", dialer); @@ -1904,7 +1893,7 @@ return; } - assert(requestBodySource != NULL); + Must(requestBodySource != NULL); if (requestBodySource->buf().hasContent()) { // XXX: why does not this trigger a debug message on every request? Index: squid3/src/http.h diff -u squid3/src/http.h:1.30 squid3/src/http.h:1.28.4.5 --- squid3/src/http.h:1.30 Tue Feb 12 16:50:41 2008 +++ squid3/src/http.h Thu Feb 14 13:55:08 2008 @@ -97,6 +97,8 @@ virtual void closeServer(); // end communication with the server virtual bool doneWithServer() const; // did we end communication? virtual void abortTransaction(const char *reason); // abnormal termination + //AsyncJob virtual methods + virtual bool abortOnException() {return true;}; // consuming request body virtual void handleMoreRequestBodyAvailable(); Index: squid3/src/main.cc diff -u squid3/src/main.cc:1.100 squid3/src/main.cc:1.89.4.9 --- squid3/src/main.cc:1.100 Sat Apr 12 17:13:46 2008 +++ squid3/src/main.cc Mon Apr 21 15:33:30 2008 @@ -150,6 +150,17 @@ }; }; +class XAssertsEngine : public AsyncEngine +{ + +public: + int checkEvents(int timeout) + { + TheAssertsPerStep = 0; + return EVENT_IDLE; + }; +}; + class SignalEngine: public AsyncEngine { @@ -1371,6 +1382,9 @@ mainLoop.registerEngine(&signalEngine); + XAssertsEngine xassertsEngine; + mainLoop.registerEngine(&xassertsEngine); + /* TODO: stop requiring the singleton here */ mainLoop.registerEngine(EventScheduler::GetInstance()); Index: squid3/src/stat.cc diff -u squid3/src/stat.cc:1.49 squid3/src/stat.cc:1.44.4.6 --- squid3/src/stat.cc:1.49 Fri Mar 21 19:52:24 2008 +++ squid3/src/stat.cc Mon Apr 21 15:33:37 2008 @@ -759,6 +759,9 @@ storeAppendPrintf(sentry, "Allocation Size\t Alloc Count\t Alloc Delta\t Allocs/sec \n"); malloc_statistics(info_get_mallstat, sentry); #endif + storeAppendPrintf(sentry, "Internal squid errors:\n"); + storeAppendPrintf(sentry, "\tSalvaged assertions: %6d\n", TheSalvagedAsserts); + } static void Index: squid3/src/structs.h diff -u squid3/src/structs.h:1.132 squid3/src/structs.h:1.116.4.5 --- squid3/src/structs.h:1.132 Mon Apr 14 17:13:14 2008 +++ squid3/src/structs.h Mon Apr 21 15:33:37 2008 @@ -629,6 +629,7 @@ #endif char *accept_filter; + int assert_burst_max; #if USE_LOADABLE_MODULES wordlist *loadable_module_names; Index: squid3/src/tunnel.cc diff -u squid3/src/tunnel.cc:1.36 squid3/src/tunnel.cc:1.33.4.4 --- squid3/src/tunnel.cc:1.36 Sun Jan 20 01:50:58 2008 +++ squid3/src/tunnel.cc Tue Mar 25 01:55:17 2008 @@ -46,8 +46,10 @@ #include "client_side.h" #include "MemBuf.h" #include "http.h" +#include "AsyncCall.h" +#include "ICAP/AsyncJob.h" -class TunnelStateData +class TunnelStateData: virtual public AsyncJob { public: @@ -55,10 +57,12 @@ class Connection; void *operator new(size_t); void operator delete (void *); - static void ReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); - static void ReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); - static void WriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); - static void WriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); + + void *toCbdata() { return this; } + virtual bool doneAll() const {return false && AsyncJob::doneAll();} + + TunnelStateData() : AsyncJob("TunnelStateData") {} + void close(); bool noConnections() const; char *url; @@ -103,78 +107,78 @@ Connection client, server; int *status_ptr; /* pointer to status for logging */ - void copyRead(Connection &from, IOCB *completion); - + void copyRead(Connection &from, AsyncCall::Pointer call); private: CBDATA_CLASS(TunnelStateData); - void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *); - void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno); - void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno); - void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno); - void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); + void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, AsyncCall::Pointer call); + +public: + + void tunnelConnected(int fd); + void tunnelProxyConnected(int fd); + +// CommCalls + void readServer(const CommIoCbParams &io); + void readClient(const CommIoCbParams &io); + void writeClientDone(const CommIoCbParams &io); + void writeServerDone(const CommIoCbParams &io); + void tunnelConnectedWriteDone(const CommIoCbParams &io); + void tunnelProxyConnectedWriteDone(const CommIoCbParams &io); + void tunnelServerClosed(const CommCloseCbParams &io); + void tunnelClientClosed(const CommCloseCbParams &io); + void tunnelTimeout(const CommTimeoutCbParams ¶ms); + void tunnelConnectTimeout(const CommTimeoutCbParams ¶ms); + void tunnelConnectDone(const CommConnectCbParams ¶ms); }; static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n"; -static CNCB tunnelConnectDone; static ERCB tunnelErrorComplete; -static PF tunnelServerClosed; -static PF tunnelClientClosed; -static PF tunnelTimeout; static PSC tunnelPeerSelectComplete; -static void tunnelStateFree(TunnelStateData * tunnelState); -static void tunnelConnected(int fd, void *); -static void tunnelProxyConnected(int fd, void *); -static void -tunnelServerClosed(int fd, void *data) +void +TunnelStateData::tunnelServerClosed(const CommCloseCbParams &io) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - debugs(26, 3, "tunnelServerClosed: FD " << fd); - assert(fd == tunnelState->server.fd()); - tunnelState->server.fd(-1); + debugs(26, 3, "tunnelServerClosed: FD " << io.fd); + Must(io.fd == server.fd()); + server.fd(-1); - if (tunnelState->noConnections()) { - tunnelStateFree(tunnelState); + if (noConnections()) { + close(); return; } - if (!tunnelState->server.len) { - comm_close(tunnelState->client.fd()); + if (!server.len) { + comm_close(client.fd()); return; } } -static void -tunnelClientClosed(int fd, void *data) +void +TunnelStateData::tunnelClientClosed(const CommCloseCbParams &io) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - debugs(26, 3, "tunnelClientClosed: FD " << fd); - assert(fd == tunnelState->client.fd()); - tunnelState->client.fd(-1); + debugs(26, 3, "tunnelClientClosed: FD " << io.fd); + Must(io.fd == client.fd()); + client.fd(-1); - if (tunnelState->noConnections()) { - tunnelStateFree(tunnelState); + if (noConnections()) { + close(); return; } - if (!tunnelState->client.len) { - comm_close(tunnelState->server.fd()); + if (!client.len) { + comm_close(server.fd()); return; } } -static void -tunnelStateFree(TunnelStateData * tunnelState) -{ - debugs(26, 3, "tunnelStateFree: tunnelState=" << tunnelState); - assert(tunnelState != NULL); - assert(tunnelState->noConnections()); - safe_free(tunnelState->url); - FwdState::serversFree(&tunnelState->servers); - tunnelState->host = NULL; - HTTPMSGUNLOCK(tunnelState->request); - delete tunnelState; +void TunnelStateData::close() { + Must(noConnections()); + safe_free(url); + FwdState::serversFree(&servers); + host = NULL; + HTTPMSGUNLOCK(request); + deleteThis("TunnelStateData::tunnelState"); } TunnelStateData::Connection::~Connection() @@ -221,35 +225,28 @@ /* Read from server side and queue it for writing to the client */ void -TunnelStateData::ReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) -{ - TunnelStateData *tunnelState = (TunnelStateData *)data; - assert (cbdataReferenceValid (tunnelState)); - - assert(fd == tunnelState->server.fd()); - tunnelState->readServer(buf, len, errcode, xerrno); -} - -void -TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno) +TunnelStateData::readServer(const CommIoCbParams &io) { /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ - if (errcode == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - debugs(26, 3, "tunnelReadServer: FD " << server.fd() << ", read " << len << " bytes"); + debugs(26, 3, "tunnelReadServer: FD " << server.fd() << ", read " << io.size << " bytes"); - if (len > 0) { - server.bytesIn(len); - kb_incr(&statCounter.server.all.kbytes_in, len); - kb_incr(&statCounter.server.other.kbytes_in, len); + if (io.size > 0) { + server.bytesIn(io.size); + kb_incr(&statCounter.server.all.kbytes_in, io.size); + kb_incr(&statCounter.server.other.kbytes_in, io.size); } - copy (len, errcode, xerrno, server, client, WriteClientDone); + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &TunnelStateData::writeClientDone); + AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::writeClientDone", dialer); + copy (io.size, io.flag, io.xerrno, server, client, call); } void @@ -270,38 +267,34 @@ /* Read from client side and queue it for writing to the server */ void -TunnelStateData::ReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) +TunnelStateData::readClient(const CommIoCbParams &io) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - assert (cbdataReferenceValid (tunnelState)); - assert(fd == tunnelState->client.fd()); - tunnelState->readClient(buf, len, errcode, xerrno); -} + Must(io.fd == client.fd()); -void -TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrno) -{ /* * Bail out early on COMM_ERR_CLOSING * - close handlers will tidy up for us */ - if (errcode == COMM_ERR_CLOSING) + if (io.flag == COMM_ERR_CLOSING) return; - debugs(26, 3, "tunnelReadClient: FD " << client.fd() << ", read " << len << " bytes"); + debugs(26, 3, "tunnelReadClient: FD " << client.fd() << ", read " << io.size << " bytes"); - if (len > 0) { - client.bytesIn(len); - kb_incr(&statCounter.client_http.kbytes_in, len); + if (io.size > 0) { + client.bytesIn(io.size); + kb_incr(&statCounter.client_http.kbytes_in, io.size); } - copy (len, errcode, xerrno, client, server, WriteServerDone); + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &TunnelStateData::writeServerDone); + AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::writeServerDone", dialer); + copy (io.size, io.flag, io.xerrno, client, server, call); } void -TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion) +TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, AsyncCall::Pointer completion) { /* I think this is to prevent free-while-in-a-callback behaviour * - RBC 20030229 @@ -309,8 +302,12 @@ cbdataInternalLock(this); /* ??? should be locked by the caller... */ /* Bump the server connection timeout on any activity */ - if (server.fd() != -1) - commSetTimeout(server.fd(), Config.Timeout.read, tunnelTimeout, this); + if (server.fd() != -1) { + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(26, 5, "TunnelStateData::tunnelTimeout", + TimeoutDialer(this, &TunnelStateData::tunnelTimeout)); + commSetTimeout(server.fd(), Config.Timeout.read, timeoutCall); + } if (len < 0 || errcode) from.error (xerrno); @@ -322,43 +319,35 @@ comm_close(to.fd()); } } else if (cbdataReferenceValid(this)) - comm_write(to.fd(), from.buf, len, completion, this, NULL); + comm_write(to.fd(), from.buf, len, completion); cbdataInternalUnlock(this); /* ??? */ } /* Writes data from the client buffer to the server side */ void -TunnelStateData::WriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +TunnelStateData::writeServerDone(const CommIoCbParams &io) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - assert (cbdataReferenceValid (tunnelState)); - - assert(fd == tunnelState->server.fd()); - tunnelState->writeServerDone(buf, len, flag, xerrno); -} + Must(io.fd == server.fd()); -void -TunnelStateData::writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno) -{ - debugs(26, 3, "tunnelWriteServer: FD " << server.fd() << ", " << len << " bytes written"); + debugs(26, 3, "tunnelWriteServer: FD " << server.fd() << ", " << io.size << " bytes written"); /* Error? */ - if (len < 0 || flag != COMM_OK) { - server.error(xerrno); // may call comm_close + if (io.size < 0 || io.flag != COMM_OK) { + server.error(io.xerrno); // may call comm_close return; } /* EOF? */ - if (len == 0) { + if (io.size == 0) { comm_close(server.fd()); return; } /* Valid data */ - kb_incr(&statCounter.server.all.kbytes_out, len); - kb_incr(&statCounter.server.other.kbytes_out, len); - client.dataSent(len); + kb_incr(&statCounter.server.all.kbytes_out, io.size); + kb_incr(&statCounter.server.other.kbytes_out, io.size); + client.dataSent(io.size); /* If the other end has closed, so should we */ if (client.fd() == -1) { @@ -368,27 +357,21 @@ cbdataInternalLock(this); /* ??? should be locked by the caller... */ - if (cbdataReferenceValid(this)) - copyRead(client, ReadClient); + if (cbdataReferenceValid(this)) { + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &TunnelStateData::readClient); + AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::readClient", dialer); + copyRead(client, call); + } cbdataInternalUnlock(this); /* ??? */ } /* Writes data from the server buffer to the client side */ void -TunnelStateData::WriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) -{ - TunnelStateData *tunnelState = (TunnelStateData *)data; - assert (cbdataReferenceValid (tunnelState)); - - assert(fd == tunnelState->client.fd()); - tunnelState->writeClientDone(buf, len, flag, xerrno); -} - -void TunnelStateData::Connection::dataSent (size_t amount) { - assert(amount == (size_t)len); + Must(amount == (size_t)len); len =0; /* increment total object size */ @@ -397,25 +380,27 @@ } void -TunnelStateData::writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno) +TunnelStateData::writeClientDone(const CommIoCbParams &io) { - debugs(26, 3, "tunnelWriteClient: FD " << client.fd() << ", " << len << " bytes written"); + Must(io.fd == client.fd()); + + debugs(26, 3, "tunnelWriteClient: FD " << client.fd() << ", " << io.size << " bytes written"); /* Error? */ - if (len < 0 || flag != COMM_OK) { - client.error(xerrno); // may call comm_close + if (io.size < 0 || io.flag != COMM_OK) { + client.error(io.xerrno); // may call comm_close return; } /* EOF? */ - if (len == 0) { + if (io.size == 0) { comm_close(client.fd()); return; } /* Valid data */ - kb_incr(&statCounter.client_http.kbytes_out, len); - server.dataSent(len); + kb_incr(&statCounter.client_http.kbytes_out, io.size); + server.dataSent(io.size); /* If the other end has closed, so should we */ if (server.fd() == -1) { @@ -423,25 +408,28 @@ return; } - cbdataInternalLock(this); /* ??? should be locked by the caller... */ +// cbdataInternalLock(this); /* ??? should be locked by the caller... */ - if (cbdataReferenceValid(this)) - copyRead(server, ReadServer); +// if (cbdataReferenceValid(this)){ + typedef CommCbMemFunT Dialer; + Dialer dialer(this, &TunnelStateData::readServer); + AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::readServer", dialer); + copyRead(server, call); +// } - cbdataInternalUnlock(this); /* ??? */ +// cbdataInternalUnlock(this); /* ??? */ } -static void -tunnelTimeout(int fd, void *data) +void +TunnelStateData::tunnelTimeout(const CommTimeoutCbParams ¶ms) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - debugs(26, 3, "tunnelTimeout: FD " << fd); + debugs(26, 3, "tunnelTimeout: FD " << params.fd); /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */ - cbdataInternalLock(tunnelState); +// cbdataInternalLock(tunnelState); - tunnelState->client.closeIfOpen(); - tunnelState->server.closeIfOpen(); - cbdataInternalUnlock(tunnelState); + client.closeIfOpen(); + server.closeIfOpen(); +// cbdataInternalUnlock(tunnelState); } void @@ -452,79 +440,81 @@ } void -TunnelStateData::copyRead(Connection &from, IOCB *completion) +TunnelStateData::copyRead(Connection &from, AsyncCall::Pointer call) { - assert(from.len == 0); - comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this); + Must(from.len == 0); + comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call); } -static void -tunnelConnectTimeout(int fd, void *data) +void +TunnelStateData::tunnelConnectTimeout(const CommTimeoutCbParams ¶ms) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - HttpRequest *request = tunnelState->request; ErrorState *err = NULL; - if (tunnelState->servers->_peer) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->servers->_peer->host); + if (servers->_peer) + hierarchyNote(&request->hier, servers->code, + servers->_peer->host); else if (Config.onoff.log_ip_on_direct) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - fd_table[tunnelState->server.fd()].ipaddr); + hierarchyNote(&request->hier, servers->code, fd_table[server.fd()].ipaddr); else - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->host); + hierarchyNote(&request->hier, servers->code, host); err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request); - *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE; + *status_ptr = HTTP_SERVICE_UNAVAILABLE; err->xerrno = ETIMEDOUT; - err->port = tunnelState->port; + err->port = port; err->callback = tunnelErrorComplete; - err->callback_data = tunnelState; + err->callback_data = this; - errorSend(tunnelState->client.fd(), err); - comm_close(fd); + errorSend(client.fd(), err); + comm_close(params.fd); } -static void -tunnelConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) +void +TunnelStateData::tunnelConnectedWriteDone(const CommIoCbParams &io) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - if (flag != COMM_OK) { - tunnelErrorComplete(fd, data, 0); + if (io.flag != COMM_OK) { + tunnelErrorComplete(io.fd, this, 0); return; } - if (cbdataReferenceValid(tunnelState)) { - tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer); - tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient); - } + typedef CommCbMemFunT Dialer; + Dialer readServerDialer(this, &TunnelStateData::readServer); + AsyncCall::Pointer readServerCall = asyncCall(26, 5, "TunnelStateData::readServer", readServerDialer); + copyRead(server, readServerCall); + + typedef CommCbMemFunT Dialer; + Dialer readClientDialer(this, &TunnelStateData::readClient); + AsyncCall::Pointer readClientCall = asyncCall(26, 5, "TunnelStateData::readClient", readClientDialer); + copyRead(client, readClientCall); } /* * handle the write completion from a proxy request to an upstream proxy */ -static void -tunnelProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) +void +TunnelStateData::tunnelProxyConnectedWriteDone(const CommIoCbParams &io) { - tunnelConnectedWriteDone(fd, buf, size, flag, xerrno, data); + tunnelConnectedWriteDone(io); } -static void -tunnelConnected(int fd, void *data) +void +TunnelStateData::tunnelConnected(int fd) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState); - *tunnelState->status_ptr = HTTP_OK; - comm_write(tunnelState->client.fd(), conn_established, strlen(conn_established), - tunnelConnectedWriteDone, tunnelState, NULL); + debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << this); + *status_ptr = HTTP_OK; + + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::tunnelConnectedWriteDone", + Dialer(this, &TunnelStateData::tunnelConnectedWriteDone)); + comm_write(client.fd(), conn_established, strlen(conn_established), call); } static void @@ -545,50 +535,50 @@ } -static void -tunnelConnectDone(int fdnotused, comm_err_t status, int xerrno, void *data) +void +TunnelStateData::tunnelConnectDone(const CommConnectCbParams ¶ms) { - TunnelStateData *tunnelState = (TunnelStateData *)data; - HttpRequest *request = tunnelState->request; ErrorState *err = NULL; - if (tunnelState->servers->_peer) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->servers->_peer->host); + if (servers->_peer) + hierarchyNote(&request->hier, servers->code, + servers->_peer->host); else if (Config.onoff.log_ip_on_direct) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - fd_table[tunnelState->server.fd()].ipaddr); + hierarchyNote(&request->hier, servers->code, + fd_table[server.fd()].ipaddr); else - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->host); + hierarchyNote(&request->hier, servers->code, + host); - if (status == COMM_ERR_DNS) { - debugs(26, 4, "tunnelConnect: Unknown host: " << tunnelState->host); + if (params.flag == COMM_ERR_DNS) { + debugs(26, 4, "tunnelConnect: Unknown host: " << host); err = errorCon(ERR_DNS_FAIL, HTTP_NOT_FOUND, request); - *tunnelState->status_ptr = HTTP_NOT_FOUND; + *status_ptr = HTTP_NOT_FOUND; err->dnsserver_msg = xstrdup(dns_error_message); err->callback = tunnelErrorComplete; - err->callback_data = tunnelState; - errorSend(tunnelState->client.fd(), err); - } else if (status != COMM_OK) { + err->callback_data = this; + errorSend(client.fd(), err); + } else if (params.flag != COMM_OK) { err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request); - *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE; - err->xerrno = xerrno; - err->port = tunnelState->port; + *status_ptr = HTTP_SERVICE_UNAVAILABLE; + err->xerrno = params.xerrno; + err->port = port; err->callback = tunnelErrorComplete; - err->callback_data = tunnelState; - errorSend(tunnelState->client.fd(), err); + err->callback_data = this; + errorSend(client.fd(), err); } else { - if (tunnelState->servers->_peer) - tunnelProxyConnected(tunnelState->server.fd(), tunnelState); + if (servers->_peer) + tunnelProxyConnected(server.fd()); else { - tunnelConnected(tunnelState->server.fd(), tunnelState); + tunnelConnected(server.fd()); } - commSetTimeout(tunnelState->server.fd(), + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(26, 5, "TunnelStateData::tunnelTimeout", + TimeoutDialer(this, &TunnelStateData::tunnelTimeout)); + commSetTimeout(server.fd(), Config.Timeout.read, - tunnelTimeout, - tunnelState); + timeoutCall); } } @@ -662,20 +652,30 @@ tunnelState->status_ptr = status_ptr; tunnelState->client.fd(fd); tunnelState->server.fd(sock); - comm_add_close_handler(tunnelState->server.fd(), - tunnelServerClosed, - tunnelState); - comm_add_close_handler(tunnelState->client.fd(), - tunnelClientClosed, - tunnelState); - commSetTimeout(tunnelState->client.fd(), - Config.Timeout.lifetime, - tunnelTimeout, - tunnelState); + + typedef CommCbMemFunT closeDialer; + + AsyncCall::Pointer serverCloseCall = asyncCall(26, 5, "TunnelStateData::tunnelServerClosed", + closeDialer(tunnelState, &TunnelStateData::tunnelServerClosed)); + comm_add_close_handler(tunnelState->server.fd(), serverCloseCall); + + AsyncCall::Pointer clientCloseCall = asyncCall(26, 5, "TunnelStateData::tunnelClientClosed", + closeDialer(tunnelState, &TunnelStateData::tunnelClientClosed)); + comm_add_close_handler(tunnelState->client.fd(), clientCloseCall); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(26, 5, "TunnelStateData::tunnelTimeout", + TimeoutDialer(tunnelState, &TunnelStateData::tunnelTimeout)); + commSetTimeout(tunnelState->client.fd(), + Config.Timeout.lifetime, + timeoutCall); + + AsyncCall::Pointer connectTimeoutCall = asyncCall(26, 5, "TunnelStateData::tunnelConnectTimeout", + TimeoutDialer(tunnelState, &TunnelStateData::tunnelConnectTimeout)); commSetTimeout(tunnelState->server.fd(), - Config.Timeout.connect, - tunnelConnectTimeout, - tunnelState); + Config.Timeout.connect, + connectTimeoutCall); + peerSelect(request, NULL, tunnelPeerSelectComplete, @@ -687,21 +687,20 @@ commSetSelect(tunnelState->client.fd(), COMM_SELECT_READ, NULL, NULL, 0); } -static void -tunnelProxyConnected(int fd, void *data) +void +TunnelStateData::tunnelProxyConnected(int fd) { - TunnelStateData *tunnelState = (TunnelStateData *)data; HttpHeader hdr_out(hoRequest); Packer p; http_state_flags flags; - debugs(26, 3, "tunnelProxyConnected: FD " << fd << " tunnelState=" << tunnelState); + debugs(26, 3, "tunnelProxyConnected: FD " << fd << " tunnelState=" << this); memset(&flags, '\0', sizeof(flags)); - flags.proxying = tunnelState->request->flags.proxying; + flags.proxying = request->flags.proxying; MemBuf mb; mb.init(); - mb.Printf("CONNECT %s HTTP/1.0\r\n", tunnelState->url); - HttpStateData::httpBuildRequestHeader(tunnelState->request, - tunnelState->request, + mb.Printf("CONNECT %s HTTP/1.0\r\n", url); + HttpStateData::httpBuildRequestHeader(request, + request, NULL, /* StoreEntry */ &hdr_out, flags); /* flags */ @@ -711,8 +710,16 @@ packerClean(&p); mb.append("\r\n", 2); - comm_write_mbuf(tunnelState->server.fd(), &mb, tunnelProxyConnectedWriteDone, tunnelState); - commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState); + typedef CommCbMemFunT WriteDialer; + AsyncCall::Pointer writeCall = asyncCall(26, 5, "TunnelStateData::tunnelProxyConnectedWriteDone", + WriteDialer(this, &TunnelStateData::tunnelProxyConnectedWriteDone)); + + comm_write_mbuf(server.fd(), &mb, writeCall); + + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(26, 5, "TunnelStateData::tunnelTimeout", + TimeoutDialer(this, &TunnelStateData::tunnelTimeout)); + commSetTimeout(server.fd(), Config.Timeout.read, timeoutCall); } static void @@ -760,11 +767,14 @@ #endif + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(26, 5, "TunnelStateData::tunnelConnectDone", + Dialer(tunnelState, &TunnelStateData::tunnelConnectDone)); + commConnectStart(tunnelState->server.fd(), tunnelState->host, tunnelState->port, - tunnelConnectDone, - tunnelState); + call); } CBDATA_CLASS_INIT(TunnelStateData); Index: squid3/src/ICAP/AsyncJob.cc diff -u squid3/src/ICAP/AsyncJob.cc:1.6 squid3/src/ICAP/AsyncJob.cc:1.3.4.15 --- squid3/src/ICAP/AsyncJob.cc:1.6 Thu Apr 17 14:52:44 2008 +++ squid3/src/ICAP/AsyncJob.cc Mon Apr 21 15:33:43 2008 @@ -119,6 +119,9 @@ { // we must be called asynchronously and hence, the caller must lock us Must(cbdataReferenceValid(toCbdata())); + + if(abortOnException()) + abort(); mustStop("exception"); } @@ -210,12 +213,15 @@ job->callStart(call); try { + WillCatchException(call.debugSection, 3, call.name); doDial(); + WontCatchException(); } catch (const TextException &e) { debugs(call.debugSection, 3, HERE << call.name << " threw exception: " << e.message); job->callException(e); + WontCatchException(); } job->callEnd(); // may delete job Index: squid3/src/ICAP/AsyncJob.h diff -u squid3/src/ICAP/AsyncJob.h:1.5 squid3/src/ICAP/AsyncJob.h:1.3.14.12 --- squid3/src/ICAP/AsyncJob.h:1.5 Tue Feb 26 13:50:57 2008 +++ squid3/src/ICAP/AsyncJob.h Mon Apr 21 15:33:43 2008 @@ -65,6 +65,7 @@ void callStart(AsyncCall &call); virtual void callException(const TextException &e); virtual void callEnd(); + virtual bool abortOnException() {return false;}; protected: const char *stopReason; // reason for forcing done() to be true