From a49d88568fef290f1ad3ce5656d0e30d41d61606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Wed, 11 Nov 2020 10:46:33 +0100 Subject: [PATCH] Turn all the callback to be always asynchronous When calling the high level netmgr functions, the callback would be sometimes called synchronously if we catch the failure directly, or asynchronously if it happens later. The synchronous call to the callback could create deadlocks as the caller would not expect the failed callback to be executed directly. --- lib/isc/netmgr/netmgr-int.h | 60 ++++++++ lib/isc/netmgr/netmgr.c | 204 ++++++++++++++++++++++++- lib/isc/netmgr/tcp.c | 296 +++++++++++++++++------------------- lib/isc/netmgr/tcpdns.c | 31 ++-- lib/isc/netmgr/tls.c | 6 +- lib/isc/netmgr/udp.c | 242 ++++++++++------------------- 6 files changed, 497 insertions(+), 342 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 5b63dfc98d..4b56b1b772 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -172,6 +172,11 @@ typedef enum isc__netievent_type { netievent_stop, netievent_pause, + netievent_connectcb, + netievent_acceptcb, + netievent_readcb, + netievent_sendcb, + netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority * events, which can be processed @@ -187,6 +192,7 @@ typedef union { isc_nm_recv_cb_t recv; isc_nm_cb_t send; isc_nm_cb_t connect; + isc_nm_accept_cb_t accept; } isc__nm_cb_t; /* @@ -260,6 +266,18 @@ typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t; +typedef struct isc__netievent__socket_req_result { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc__nm_uvreq_t *req; + isc_result_t result; +} isc__netievent__socket_req_result_t; + +typedef isc__netievent__socket_req_result_t isc__netievent_connectcb_t; +typedef isc__netievent__socket_req_result_t isc__netievent_acceptcb_t; +typedef isc__netievent__socket_req_result_t isc__netievent_readcb_t; +typedef isc__netievent__socket_req_result_t isc__netievent_sendcb_t; + typedef struct isc__netievent__socket_streaminfo_quota { isc__netievent_type type; isc_nmsocket_t *sock; @@ -746,6 +764,48 @@ isc__nmsocket_clearcb(isc_nmsocket_t *sock); * Clear the recv and accept callbacks in 'sock'. */ +void +isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0); +/*%< + * Issue a connect callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + */ + +void +isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0); +/*%< + * Issue a accept callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + */ + +void +isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0); + +/*%< + * Issue a read callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + * + */ + +void +isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0); +/*%< + * Issue a write callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + */ + void isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0); /*%< diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index b7a8a29d8c..632b636733 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -700,9 +700,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { isc__nm_async_tls_do_bio(worker, ievent); break; + case netievent_connectcb: + isc__nm_async_connectcb(worker, ievent); + break; + case netievent_acceptcb: + isc__nm_async_acceptcb(worker, ievent); + break; + case netievent_readcb: + isc__nm_async_readcb(worker, ievent); + break; + case netievent_sendcb: + isc__nm_async_sendcb(worker, ievent); + break; case netievent_closecb: isc__nm_async_closecb(worker, ievent); break; + case netievent_detach: isc__nm_async_detach(worker, ievent); break; @@ -1645,17 +1658,194 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) { } void -isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0; +isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_connectcb_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_connectcb); - REQUIRE(VALID_NMSOCK(ievent->sock)); - REQUIRE(ievent->sock->tid == isc_nm_tid()); - REQUIRE(ievent->sock->closehandle_cb != NULL); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_connectcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; UNUSED(worker); - ievent->sock->closehandle_cb(ievent->sock); - isc__nmsocket_detach(&ievent->sock); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(ievent->sock->tid == isc_nm_tid()); + REQUIRE(uvreq->cb.connect != NULL); + + uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_acceptcb_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_acceptcb); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_acceptcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_acceptcb_t *ievent = (isc__netievent_acceptcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(uvreq->cb.accept != NULL); + + uvreq->cb.accept(uvreq->handle, eresult, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_readcb_t *ievent = isc__nm_get_ievent(sock->mgr, + netievent_readcb); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_readcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; + isc_region_t region = { .base = (unsigned char *)uvreq->uvbuf.base, + .length = uvreq->uvbuf.len }; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); + + uvreq->cb.recv(uvreq->handle, eresult, ®ion, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_sendcb_t *ievent = isc__nm_get_ievent(sock->mgr, + netievent_sendcb); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_sendcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); + + uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(VALID_NMSOCK(ievent->sock)); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(sock->closehandle_cb != NULL); + + UNUSED(worker); + + ievent->sock->closehandle_cb(sock); + isc__nmsocket_detach(&sock); } void diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 83236e4aeb..4997d6f8de 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -49,7 +49,7 @@ can_log_tcp_quota(void) { return (false); } -static int +static isc_result_t tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); static void @@ -80,6 +80,17 @@ quota_accept_cb(isc_quota_t *quota, void *sock0); static void failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); +static void +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); + +static bool +inactive(isc_nmsocket_t *sock) { + return (!isc__nmsocket_active(sock) || + atomic_load(&sock->mgr->closing) || + (sock->server != NULL && !isc__nmsocket_active(sock->server))); +} + static void failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { /* @@ -126,20 +137,17 @@ failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, } if (!atomic_load(&sock->connecting)) { + isc__nm_uvreq_put(&req, sock); return; } - atomic_store(&sock->connecting, false); isc__nmsocket_clearcb(sock); if (req->cb.connect != NULL) { - req->cb.connect(NULL, eresult, req->cbarg); + isc__nm_connectcb(sock, req, eresult); + } else { + isc__nm_uvreq_put(&req, sock); } - req->cb.connect = NULL; - req->cbarg = NULL; - - isc__nm_uvreq_put(&req, sock); - isc__nmsocket_detach(&sock); } static void @@ -147,16 +155,22 @@ connecttimeout_cb(uv_timer_t *handle) { isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle); isc_nmsocket_t *sock = req->sock; + REQUIRE(VALID_UVREQ(req)); + REQUIRE(VALID_NMHANDLE(req->handle)); REQUIRE(sock->tid == isc_nm_tid()); failed_connect_cb(sock, req, ISC_R_TIMEDOUT); + isc__nmsocket_detach(&sock); } -static int +static isc_result_t tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; int r; + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->tid == isc_nm_tid()); @@ -169,11 +183,8 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); atomic_store(&sock->closing, true); atomic_store(&sock->closed, true); - atomic_store(&sock->result, isc__nm_uverr2result(r)); - atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); - return (r); + return (isc__nm_uverr2result(r)); } if (req->local.length != 0) { @@ -181,12 +192,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); - atomic_store(&sock->result, isc__nm_uverr2result(r)); - atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_tcp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } } @@ -203,12 +211,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECTFAIL]); - atomic_store(&sock->result, isc__nm_uverr2result(r)); - atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_tcp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); @@ -216,7 +221,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { 0); sock->timer_running = true; - return (0); + return (ISC_R_SUCCESS); } void @@ -225,22 +230,28 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { (isc__netievent_tcpconnect_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; - int r; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_nm_tid()); + isc_result_t result = ISC_R_SUCCESS; UNUSED(worker); - r = tcp_connect_direct(sock, req); - if (r != 0) { - LOCK(&sock->lock); - SIGNAL(&sock->cond); - UNLOCK(&sock->lock); - return; - } + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(sock->iface != NULL); + REQUIRE(sock->parent == NULL); + REQUIRE(sock->tid == isc_nm_tid()); - atomic_store(&sock->connected, true); + req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); + result = tcp_connect_direct(sock, req); + atomic_store(&sock->result, result); + if (result == ISC_R_SUCCESS) { + atomic_store(&sock->connected, true); + /* uvreq will be freed in tcp_connect_cb */ + /* socket will be detached in tcp_connect_cb */ + } else { + atomic_store(&sock->connect_error, true); + isc__nm_uvreq_put(&req, sock); + isc__nmsocket_detach(&ievent->sock); + } LOCK(&sock->lock); SIGNAL(&sock->cond); @@ -250,28 +261,32 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc_result_t result; - isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq); + isc__nm_uvreq_t *req = NULL; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); struct sockaddr_storage ss; - isc_nmhandle_t *handle = NULL; int r; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); + /* We timed out */ + if (!atomic_load(&sock->connecting)) { + return; + } + + req = uv_handle_get_data((uv_handle_t *)uvreq); + + REQUIRE(VALID_UVREQ(req)); + REQUIRE(VALID_NMHANDLE(req->handle)); + if (sock->timer_running) { uv_timer_stop(&sock->timer); sock->timer_running = false; } - if (!atomic_load(&sock->connecting)) { - return; - } - - REQUIRE(VALID_UVREQ(req)); - if (status != 0) { failed_connect_cb(sock, req, isc__nm_uverr2result(status)); + isc__nmsocket_detach(&sock); return; } @@ -280,6 +295,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { &(int){ sizeof(ss) }); if (r != 0) { failed_connect_cb(sock, req, isc__nm_uverr2result(r)); + isc__nmsocket_detach(&sock); return; } @@ -288,21 +304,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); RUNTIME_CHECK(result == ISC_R_SUCCESS); - handle = isc__nmhandle_get(sock, NULL, NULL); - req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg); - - isc__nm_uvreq_put(&req, sock); + isc__nm_connectcb(sock, req, ISC_R_SUCCESS); /* * The sock is now attached to the handle. */ isc__nmsocket_detach(&sock); - - /* - * The connect callback should have attached to the handle. - * If it didn't, the socket will be closed now. - */ - isc_nmhandle_detach(&handle); } isc_result_t @@ -310,7 +317,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { isc_result_t result = ISC_R_SUCCESS; - isc_nmsocket_t *nsock = NULL, *tmp = NULL; + isc_nmsocket_t *sock = NULL, *tmp = NULL; isc__netievent_tcpconnect_t *ievent = NULL; isc__nm_uvreq_t *req = NULL; @@ -318,50 +325,50 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, REQUIRE(local != NULL); REQUIRE(peer != NULL); - nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); - isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local); + sock = isc_mem_get(mgr->mctx, sizeof(*sock)); + isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local); - nsock->extrahandlesize = extrahandlesize; - nsock->connect_timeout = timeout; + sock->extrahandlesize = extrahandlesize; + sock->connect_timeout = timeout; - atomic_init(&nsock->result, ISC_R_SUCCESS); - atomic_init(&nsock->client, true); + atomic_init(&sock->result, ISC_R_SUCCESS); + atomic_init(&sock->client, true); - req = isc__nm_uvreq_get(mgr, nsock); + req = isc__nm_uvreq_get(mgr, sock); req->cb.connect = cb; req->cbarg = cbarg; req->peer = peer->addr; req->local = local->addr; ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect); - ievent->sock = nsock; + ievent->sock = sock; ievent->req = req; /* * Async callbacks can dereference the socket in the meantime, * we need to hold an additional reference to it. */ - isc__nmsocket_attach(nsock, &tmp); + isc__nmsocket_attach(sock, &tmp); if (isc__nm_in_netthread()) { - nsock->tid = isc_nm_tid(); - isc__nm_async_tcpconnect(&mgr->workers[nsock->tid], + sock->tid = isc_nm_tid(); + isc__nm_async_tcpconnect(&mgr->workers[sock->tid], (isc__netievent_t *)ievent); isc__nm_put_ievent(mgr, ievent); } else { - nsock->tid = isc_random_uniform(mgr->nworkers); - isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + sock->tid = isc_random_uniform(mgr->nworkers); + isc__nm_enqueue_ievent(&mgr->workers[sock->tid], (isc__netievent_t *)ievent); - LOCK(&nsock->lock); - while (!atomic_load(&nsock->connected) && - !atomic_load(&nsock->connect_error)) { - WAIT(&nsock->cond, &nsock->lock); + LOCK(&sock->lock); + while (!atomic_load(&sock->connected) && + !atomic_load(&sock->connect_error)) { + WAIT(&sock->cond, &sock->lock); } - UNLOCK(&nsock->lock); + UNLOCK(&sock->lock); } - result = atomic_load(&nsock->result); + result = atomic_load(&sock->result); isc__nmsocket_detach(&tmp); @@ -581,13 +588,11 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpchildaccept_t *ievent = (isc__netievent_tcpchildaccept_t *)ev0; isc_nmsocket_t *sock = ievent->sock; - isc_nmhandle_t *handle; isc_result_t result; + isc__nm_uvreq_t *req = NULL; struct sockaddr_storage ss; isc_sockaddr_t local; int r; - isc_nm_accept_cb_t accept_cb; - void *accept_cbarg; REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->tid == isc_nm_tid()); @@ -650,25 +655,22 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { } sock->accepting = false; - handle = isc__nmhandle_get(sock, NULL, &local); - INSIST(sock->accept_cb != NULL); - accept_cb = sock->accept_cb; - accept_cbarg = sock->accept_cbarg; sock->read_timeout = sock->mgr->init; - accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); + + req = isc__nm_uvreq_get(sock->mgr, sock); + req->handle = isc__nmhandle_get(sock, NULL, &local); + req->cb.accept = sock->accept_cb; + req->cbarg = sock->accept_cbarg; + + isc__nm_acceptcb(sock, req, ISC_R_SUCCESS); /* * sock is now attached to the handle. */ isc__nmsocket_detach(&sock); - /* - * The accept callback should have attached to the handle. - * If it didn't, the socket will be closed now. - */ - isc_nmhandle_detach(&handle); return; error: @@ -733,9 +735,6 @@ tcp_listenclose_cb(uv_handle_t *handle) { static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - isc_nm_recv_cb_t cb; - void *cbarg = NULL; - REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle != NULL); @@ -750,12 +749,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { uv_read_stop(&sock->uv_handle.stream); - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - isc__nmsocket_clearcb(sock); + if (sock->recv_cb != NULL) { + isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock); + isc_nmhandle_attach(sock->statichandle, &req->handle); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; - if (cb != NULL) { - cb(sock->statichandle, result, NULL, cbarg); + isc__nmsocket_clearcb(sock); + + isc__nm_readcb(sock, req, result); + } +} + +static void +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + + if (req->cb.send != NULL) { + isc__nm_sendcb(sock, req, eresult); + } else { + isc__nm_uvreq_put(&req, sock); } } @@ -790,29 +805,17 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); - if (!isc__nmsocket_active(sock)) { + sock->recv_cb = cb; + sock->recv_cbarg = cbarg; + + if (inactive(sock)) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - - if (atomic_load(&sock->mgr->closing)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); + failed_read_cb(sock, ISC_R_CANCELED); return; } REQUIRE(sock->tid == isc_nm_tid()); - sock->recv_cb = cb; - sock->recv_cbarg = cbarg; - sock->read_timeout = (atomic_load(&sock->keepalive) ? sock->mgr->keepalive : sock->mgr->idle); @@ -864,17 +867,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(worker->id == isc_nm_tid()); - if (!isc__nmsocket_active(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -982,13 +975,22 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { REQUIRE(buf != NULL); if (nread >= 0) { - isc_region_t region = { .base = (unsigned char *)buf->base, - .length = nread }; - isc_nm_recv_cb_t cb = sock->recv_cb; - void *cbarg = sock->recv_cbarg; + if (sock->recv_cb != NULL) { + isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, + sock); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; + isc_nmhandle_attach(sock->statichandle, &req->handle); - if (cb != NULL) { - cb(sock->statichandle, ISC_R_SUCCESS, ®ion, cbarg); + /* + * The callback will be called synchronously because the + * result is ISC_R_SUCCESS, so we don't need to retain + * the buffer + */ + req->uvbuf.base = buf->base; + req->uvbuf.len = nread; + + isc__nm_readcb(sock, req, ISC_R_SUCCESS); } if (sock->timer_initialized && sock->read_timeout != 0) { @@ -1183,24 +1185,6 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, REQUIRE(sock->type == isc_nm_tcpsocket); - if (!isc__nmsocket_active(sock)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } - - if (atomic_load(&sock->mgr->closing)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } - uvreq = isc__nm_uvreq_get(sock->mgr, sock); uvreq->uvbuf.base = (char *)region->base; uvreq->uvbuf.len = region->length; @@ -1210,6 +1194,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, uvreq->cb.send = cb; uvreq->cbarg = cbarg; + if (inactive(sock)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + failed_send_cb(sock, uvreq, ISC_R_CANCELED); + return; + } + if (sock->tid == isc_nm_tid()) { /* * If we're in the same thread as the socket we can send the @@ -1219,8 +1209,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(sock, uvreq, result); } } else { /* @@ -1284,13 +1273,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_tcpsocket); - if (!isc__nmsocket_active(sock)) { - return (ISC_R_CANCELED); - } - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - return (ISC_R_CANCELED); - } - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { return (ISC_R_CANCELED); } @@ -1408,11 +1391,6 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { } if (atomic_load(&sock->connecting)) { - if (sock->timer_initialized) { - isc__nm_uvreq_t *req = - uv_handle_get_data((uv_handle_t *)&sock->timer); - failed_connect_cb(sock, req, ISC_R_CANCELED); - } return; } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 2a1596d0e8..f742c64a65 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -787,11 +787,6 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) { isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); - if (result != ISC_R_SUCCESS) { - cb(NULL, result, cbarg); - return; - } - dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock)); isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket, handle->sock->iface); @@ -807,6 +802,13 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) { readhandle = isc__nmhandle_get(dnssock, NULL, NULL); + if (result != ISC_R_SUCCESS) { + cb(readhandle, result, cbarg); + isc__nmsocket_detach(&dnssock); + isc_nmhandle_detach(&readhandle); + return; + } + INSIST(dnssock->statichandle != NULL); INSIST(dnssock->statichandle == readhandle); INSIST(readhandle->sock == dnssock); @@ -838,20 +840,26 @@ isc_result_t isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { - tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); + isc_result_t result = ISC_R_SUCCESS; + tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(*conn)); *conn = (tcpconnect_t){ .cb = cb, .cbarg = cbarg, .extrahandlesize = extrahandlesize }; isc_mem_attach(mgr->mctx, &conn->mctx); - return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn, - timeout, 0)); + result = isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn, + timeout, 0); + if (result != ISC_R_SUCCESS) { + isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); + } + return (result); } isc_result_t isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { + isc_result_t result = ISC_R_SUCCESS; tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); SSL_CTX *ctx = NULL; @@ -861,9 +869,12 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_mem_attach(mgr->mctx, &conn->mctx); ctx = SSL_CTX_new(SSLv23_client_method()); - isc_result_t result = isc_nm_tlsconnect( - mgr, local, peer, tcpdnsconnect_cb, conn, ctx, timeout, 0); + result = isc_nm_tlsconnect(mgr, local, peer, tcpdnsconnect_cb, conn, + ctx, timeout, 0); SSL_CTX_free(ctx); + if (result != ISC_R_SUCCESS) { + isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); + } return (result); } diff --git a/lib/isc/netmgr/tls.c b/lib/isc/netmgr/tls.c index acaaeb1ad5..fdface5aa2 100644 --- a/lib/isc/netmgr/tls.c +++ b/lib/isc/netmgr/tls.c @@ -79,6 +79,7 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { UNUSED(handle); /* XXXWPK TODO */ UNUSED(eresult); + isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base, sock->tls.senddata.length); sock->tls.senddata = (isc_region_t){ NULL, 0 }; @@ -701,7 +702,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { REQUIRE(VALID_NMSOCK(tlssock)); if (result != ISC_R_SUCCESS) { - tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); + tlssock->connect_cb(handle, result, tlssock->connect_cbarg); atomic_store(&tlssock->result, result); atomic_store(&tlssock->connect_error, true); tls_close_direct(tlssock); @@ -714,7 +715,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmhandle_attach(handle, &tlssock->outerhandle); result = initialize_tls(tlssock, false); if (result != ISC_R_SUCCESS) { - tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); + tlssock->connect_cb(handle, result, tlssock->connect_cbarg); atomic_store(&tlssock->result, result); atomic_store(&tlssock->connect_error, true); tls_close_direct(tlssock); @@ -742,6 +743,7 @@ isc__nm_async_tlsconnect(isc__networker_t *worker, isc__netievent_t *ev0) { tls_connect_cb, tlssock, tlssock->connect_timeout, 0); if (result != ISC_R_SUCCESS) { + /* FIXME: We need to pass valid handle */ tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); atomic_store(&tlssock->result, result); atomic_store(&tlssock->connect_error, true); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 9f8a37340e..ecaf66046a 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -51,8 +51,15 @@ static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); static void -failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult); +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); + +static bool +inactive(isc_nmsocket_t *sock) { + return (!isc__nmsocket_active(sock) || + atomic_load(&sock->mgr->closing) || + (sock->server != NULL && !isc__nmsocket_active(sock->server))); +} isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, @@ -339,14 +346,11 @@ static void udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { isc_result_t result; - isc_nmhandle_t *nmhandle = NULL; isc_sockaddr_t sockaddr; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); - isc_region_t region; + isc__nm_uvreq_t *req = NULL; uint32_t maxudp; bool free_buf; - isc_nm_recv_cb_t cb; - void *cbarg; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -385,38 +389,35 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, goto done; } - region.base = (unsigned char *)buf->base; - region.length = nrecv; - - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - if (sock->timer_running) { uv_timer_stop(&sock->timer); sock->timer_running = false; } + req = isc__nm_uvreq_get(sock->mgr, sock); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; + /* + * The callback will be called synchronously, because result is + * ISC_R_SUCCESS. + */ + req->uvbuf.base = buf->base; + req->uvbuf.len = nrecv; + if (atomic_load(&sock->client)) { if (nrecv < 0) { failed_read_cb(sock, isc__nm_uverr2result(nrecv)); return; } - cb(sock->statichandle, ISC_R_SUCCESS, ®ion, cbarg); + isc_nmhandle_attach(sock->statichandle, &req->handle); } else { result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); - nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); - - cb(nmhandle, ISC_R_SUCCESS, ®ion, cbarg); - - /* - * If the recv callback wants to hold on to the handle, - * it needs to attach to it. - */ - isc_nmhandle_detach(&nmhandle); + req->handle = isc__nmhandle_get(sock, &sockaddr, NULL); } + isc__nm_readcb(sock, req, ISC_R_SUCCESS); done: if (free_buf) { @@ -440,21 +441,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, uint32_t maxudp = atomic_load(&sock->mgr->maxudp); int ntid; - if (!isc__nmsocket_active(sock)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } + uvreq = isc__nm_uvreq_get(sock->mgr, sock); + uvreq->uvbuf.base = (char *)region->base; + uvreq->uvbuf.len = region->length; - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } + isc_nmhandle_attach(handle, &uvreq->handle); - if (atomic_load(&sock->mgr->closing)) { + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; + + if (inactive(sock)) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); + failed_send_cb(sock, uvreq, ISC_R_CANCELED); return; } @@ -467,6 +465,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, * we need to do so here. */ if (maxudp != 0 && region->length > maxudp) { + isc__nm_uvreq_put(&uvreq, sock); isc_nmhandle_detach(&handle); return; } @@ -499,15 +498,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, rsock = &psock->children[ntid]; } - uvreq = isc__nm_uvreq_get(sock->mgr, sock); - uvreq->uvbuf.base = (char *)region->base; - uvreq->uvbuf.len = region->length; - - isc_nmhandle_attach(handle, &uvreq->handle); - - uvreq->cb.send = cb; - uvreq->cbarg = cbarg; - if (isc_nm_tid() == rsock->tid) { /* * If we're in the same thread as the socket we can send @@ -518,8 +508,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, if (result != ISC_R_SUCCESS) { isc__nm_incstats(rsock->mgr, rsock->statsindex[STATID_SENDFAIL]); - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(rsock, uvreq, result); } } else { /* @@ -549,23 +538,21 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(worker->id == sock->tid); if (!isc__nmsocket_active(ievent->sock)) { - uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(sock, uvreq, ISC_R_CANCELED); return; } result = udp_send_direct(sock, uvreq, &ievent->peer); if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(sock, uvreq, result); } } static void udp_send_cb(uv_udp_send_t *req, int status) { isc_result_t result = ISC_R_SUCCESS; - isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; + isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req); isc_nmsocket_t *sock = uvreq->sock; REQUIRE(VALID_UVREQ(uvreq)); @@ -576,8 +563,7 @@ udp_send_cb(uv_udp_send_t *req, int status) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); } - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, uvreq->sock); + isc__nm_sendcb(sock, uvreq, result); } /* @@ -595,13 +581,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udpsocket); - if (!isc__nmsocket_active(sock)) { - return (ISC_R_CANCELED); - } - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - return (ISC_R_CANCELED); - } - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { return (ISC_R_CANCELED); } @@ -625,7 +605,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, return (ISC_R_SUCCESS); } -static int +static isc_result_t udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; int uv_bind_flags = UV_UDP_REUSEADDR; @@ -644,11 +624,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { /* Socket was never opened; no need for isc__nm_udp_close() */ atomic_store(&sock->closing, true); atomic_store(&sock->closed, true); - atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); - return (r); + return (isc__nm_uverr2result(r)); } r = uv_udp_open(&sock->uv_handle.udp, sock->fd); @@ -656,10 +634,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); atomic_store(&sock->connect_error, true); atomic_store(&sock->result, isc__nm_uverr2result(r)); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_udp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); @@ -673,10 +650,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); atomic_store(&sock->connect_error, true); atomic_store(&sock->result, isc__nm_uverr2result(r)); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_udp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } uv_handle_set_data(&sock->uv_handle.handle, sock); @@ -687,10 +663,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { sock->statsindex[STATID_CONNECTFAIL]); atomic_store(&sock->connect_error, true); atomic_store(&sock->result, isc__nm_uverr2result(r)); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_udp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); atomic_store(&sock->connecting, false); @@ -703,7 +678,7 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { uv_send_buffer_size(&sock->uv_handle.handle, &(int){ ISC_SEND_BUFFER_SIZE }); #endif - return (0); + return (ISC_R_SUCCESS); } /* @@ -716,37 +691,27 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { (isc__netievent_udpconnect_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; - isc_nmhandle_t *handle = NULL; - isc_nm_cb_t cb; - void *cbarg; - int r; isc_result_t result; UNUSED(worker); + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->iface != NULL); REQUIRE(sock->parent == NULL); REQUIRE(sock->tid == isc_nm_tid()); - cb = sock->connect_cb; - cbarg = sock->connect_cbarg; - - r = udp_connect_direct(sock, req); - if (r != 0) { - LOCK(&sock->lock); - SIGNAL(&sock->cond); - UNLOCK(&sock->lock); - return; + req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); + result = udp_connect_direct(sock, req); + atomic_store(&sock->result, result); + if (result == ISC_R_SUCCESS) { + atomic_store(&sock->connected, true); + isc__nm_connectcb(sock, req, ISC_R_SUCCESS); + } else { + atomic_store(&sock->connect_error, true); + isc__nm_uvreq_put(&req, sock); } - atomic_store(&sock->connected, true); - atomic_store(&sock->result, ISC_R_SUCCESS); - result = atomic_load(&sock->result); - - handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); - cb(handle, result, cbarg); - LOCK(&sock->lock); SIGNAL(&sock->cond); UNLOCK(&sock->lock); @@ -755,12 +720,6 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { * The sock is now attached to the handle. */ isc__nmsocket_detach(&sock); - - /* - * The connect callback should have attached to the handle. - * If it didn't, the socket will be closed now. - */ - isc_nmhandle_detach(&handle); } isc_result_t @@ -834,7 +793,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc__nm_async_udpconnect(&mgr->workers[sock->tid], (isc__netievent_t *)event); isc__nm_put_ievent(mgr, event); - isc__nm_uvreq_put(&req, sock); } else { sock->tid = isc_random_uniform(mgr->nworkers); isc__nm_enqueue_ievent(&mgr->workers[sock->tid], @@ -846,7 +804,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, WAIT(&sock->cond, &sock->lock); } UNLOCK(&sock->lock); - isc__nm_uvreq_put(&req, sock); } result = atomic_load(&sock->result); @@ -867,9 +824,6 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - isc_nm_recv_cb_t cb; - void *cbarg = NULL; - REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle != NULL); @@ -880,12 +834,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { uv_udp_recv_stop(&sock->uv_handle.udp); - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - isc__nmsocket_clearcb(sock); + if (sock->recv_cb != NULL) { + isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock); + isc_nmhandle_attach(sock->statichandle, &req->handle); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; - if (cb != NULL) { - cb(sock->statichandle, result, NULL, cbarg); + isc__nmsocket_clearcb(sock); + + isc__nm_readcb(sock, req, result); + } +} + +static void +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + + if (req->cb.send != NULL) { + isc__nm_sendcb(sock, req, eresult); + } else { + isc__nm_uvreq_put(&req, sock); } } @@ -911,17 +881,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; isc_nmsocket_t *sock = ievent->sock; - if (!isc__nmsocket_active(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -950,24 +910,12 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMSOCK(handle->sock)); REQUIRE(handle->sock->type == isc_nm_udpsocket); - if (!isc__nmsocket_active(sock)) { + if (inactive(sock)) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); cb(handle, ISC_R_CANCELED, NULL, cbarg); return; } - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - - if (atomic_load(&sock->mgr->closing)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - REQUIRE(sock->tid == isc_nm_tid()); sock->recv_cb = cb; sock->recv_cbarg = cbarg; @@ -1059,35 +1007,6 @@ isc__nm_udp_close(isc_nmsocket_t *sock) { } } -static void -failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(sock->tid == isc_nm_tid()); - - if (sock->timer_running) { - uv_timer_stop(&sock->timer); - sock->timer_running = false; - } - - if (!atomic_load(&sock->connecting)) { - return; - } - - atomic_store(&sock->connecting, false); - - INSIST(req != NULL); - - isc__nmsocket_clearcb(sock); - - if (req->cb.connect != NULL) { - req->cb.connect(NULL, eresult, req->cbarg); - } - req->cb.connect = NULL; - req->cbarg = NULL; - - isc__nmsocket_detach(&sock); -} - void isc__nm_udp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); @@ -1098,11 +1017,6 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) { } if (atomic_load(&sock->connecting)) { - if (sock->timer_initialized) { - isc__nm_uvreq_t *req = - uv_handle_get_data((uv_handle_t *)&sock->timer); - failed_connect_cb(sock, req, ISC_R_CANCELED); - } return; }