writing works now, too.

This commit is contained in:
Michael Graff 1998-11-11 00:43:14 +00:00
parent 59b2e9c361
commit 3d8e572880
3 changed files with 429 additions and 38 deletions

View file

@ -40,11 +40,33 @@ my_shutdown(isc_task_t task, isc_event_t event)
return (ISC_TRUE);
}
static isc_boolean_t
my_send(isc_task_t task, isc_event_t event)
{
isc_socket_t sock;
isc_socketevent_t dev;
sock = event->sender;
dev = (isc_socketevent_t)event;
printf("my_send: %s task %p\n\t(sock %p, base %p, length %d, n %d, result %d)\n",
(char *)(event->arg), task, sock,
dev->region.base, dev->region.length,
dev->n, dev->result);
isc_mem_put(event->mctx, dev->region.base, dev->region.length);
isc_event_free(&event);
return (0);
}
static isc_boolean_t
my_recv(isc_task_t task, isc_event_t event)
{
isc_socket_t sock;
isc_socketevent_t dev;
struct isc_region region;
char buf[1024];
sock = event->sender;
dev = (isc_socketevent_t)event;
@ -59,12 +81,24 @@ my_recv(isc_task_t task, isc_event_t event)
isc_event_free(&event);
return (1);
return (0);
}
/*
* Echo the data back
*/
region = dev->region;
region.base[20] = 0;
snprintf(buf, sizeof buf, "Received: %s\r\n", region.base);
region.base = isc_mem_get(event->mctx, strlen(buf) + 1);
region.length = strlen(buf) + 1;
strcpy(region.base, buf); /* strcpy is safe */
isc_socket_send(sock, &region, task, my_send, event->arg);
isc_socket_recv(sock, &dev->region, ISC_FALSE,
task, my_recv, event->arg);
isc_event_free(&event);
return (0);
@ -75,7 +109,7 @@ my_listen(isc_task_t task, isc_event_t event)
{
char *name = event->arg;
isc_socket_newconnev_t dev;
isc_region_t region;
struct isc_region region;
dev = (isc_socket_newconnev_t)event;
@ -89,16 +123,13 @@ my_listen(isc_task_t task, isc_event_t event)
*/
isc_socket_accept(event->sender, task, my_listen, event->arg);
region = isc_mem_get(event->mctx, sizeof(*region));
INSIST(region != NULL);
region->base = isc_mem_get(event->mctx, 21);
region->length = 20;
region.base = isc_mem_get(event->mctx, 21);
region.length = 20;
/*
* queue up a read on this socket
*/
isc_socket_recv(dev->newsocket, region, ISC_FALSE,
isc_socket_recv(dev->newsocket, &region, ISC_FALSE,
task, my_recv, event->arg);
} else {
/*

View file

@ -1,4 +1,4 @@
/* $Id: socket.h,v 1.4 1998/11/10 11:37:54 explorer Exp $ */
/* $Id: socket.h,v 1.5 1998/11/11 00:43:14 explorer Exp $ */
#ifndef ISC_SOCKET_H
#define ISC_SOCKET_H 1
@ -410,6 +410,10 @@ isc_socket_recv(isc_socket_t socket, isc_region_t region,
isc_result_t
isc_socket_send(isc_socket_t socket, isc_region_t region,
isc_task_t task, isc_taskaction_t action, void *arg);
isc_result_t
isc_socket_sendto(isc_socket_t socket, isc_region_t region,
isc_task_t task, isc_taskaction_t action, void *arg,
isc_sockaddr_t address, int addrlength);
/*
* Send the contents of 'region' to the socket's peer.
*

View file

@ -1,4 +1,4 @@
/* $Id: socket.c,v 1.5 1998/11/10 11:37:53 explorer Exp $ */
/* $Id: socket.c,v 1.6 1998/11/11 00:43:13 explorer Exp $ */
#include "attribute.h"
@ -59,8 +59,6 @@ typedef struct isc_socket_intev {
isc_socketevent_t done_ev; /* the done event to post */
isc_boolean_t partial; /* partial i/o ok */
isc_boolean_t canceled; /* I/O was canceled */
isc_taskaction_t action; /* listen needs this too */
void *arg; /* listen needs this too */
LINK(struct isc_socket_intev) link;
} *isc_socket_intev_t;
@ -69,7 +67,6 @@ typedef struct isc_socket_ncintev {
isc_boolean_t canceled;
isc_task_t task;
isc_socket_newconnev_t done; /* the done event */
isc_socket_t sock; /* the socket we will pass or destroy */
LINK(struct isc_socket_ncintev) link;
} *isc_socket_ncintev_t;
@ -126,6 +123,7 @@ static void rwdone_event_destroy(isc_event_t);
static void free_socket(isc_socket_t *);
static isc_result_t allocate_socket(isc_socketmgr_t, isc_sockettype_t,
isc_socket_t *);
static void destroy(isc_socket_t *);
/*
* poke the select loop when there is something for us to do. Manager must
@ -201,6 +199,7 @@ static void
rwdone_event_destroy(isc_event_t ev)
{
isc_socket_t sock = ev->sender;
isc_boolean_t kill_socket = ISC_FALSE;
/*
* detach from the socket. We would have already detached from the
@ -213,17 +212,19 @@ rwdone_event_destroy(isc_event_t ev)
XTRACE(("rwdone_event_destroy: sock %p, ref cnt == %d\n",
sock, sock->references));
/*
* XXX need to free socket on cnt = 0 here
*/
if (sock->references == 0)
kill_socket = ISC_TRUE;
UNLOCK(&sock->lock);
if (kill_socket)
destroy(&sock);
}
static void
ncdone_event_destroy(isc_event_t ev)
{
isc_socket_t sock = ev->sender;
isc_boolean_t kill_socket = ISC_FALSE;
/*
* detach from the socket. We would have already detached from the
@ -231,7 +232,13 @@ ncdone_event_destroy(isc_event_t ev)
*/
LOCK(&sock->lock);
sock->references--;
if (sock->references == 0)
kill_socket = ISC_TRUE;
UNLOCK(&sock->lock);
if (kill_socket)
destroy(&sock);
}
/*
@ -240,10 +247,13 @@ ncdone_event_destroy(isc_event_t ev)
* Caller must ensure locking.
*/
static void
destroy(isc_socket_t sock)
destroy(isc_socket_t *sockp)
{
isc_socket_t sock = *sockp;
isc_socketmgr_t manager = sock->manager;
XTRACE(("destroy sockp = %p, sock = %p\n", sockp, sock));
LOCK(&manager->lock);
/*
@ -255,7 +265,7 @@ destroy(isc_socket_t sock)
UNLOCK(&manager->lock);
free_socket(&sock);
free_socket(sockp);
}
static isc_result_t
@ -312,7 +322,7 @@ free_socket(isc_socket_t *socketp)
{
isc_socket_t sock = *socketp;
REQUIRE(sock->references == 1);
REQUIRE(sock->references == 0);
REQUIRE(VALID_SOCKET(sock));
REQUIRE(!sock->listener);
REQUIRE(!sock->pending_read);
@ -323,7 +333,7 @@ free_socket(isc_socket_t *socketp)
sock->magic = 0;
if (sock->fd == -1) {
if (sock->fd != -1) {
close(sock->fd);
sock->fd = -1;
}
@ -405,6 +415,8 @@ isc_socket_create(isc_socketmgr_t manager, isc_sockettype_t type,
UNLOCK(&manager->lock);
sock->references++;
*socketp = sock;
XEXIT("isc_socket_create");
@ -436,7 +448,7 @@ void
isc_socket_detach(isc_socket_t *socketp)
{
isc_socket_t sock;
isc_boolean_t free_socket = ISC_FALSE;
isc_boolean_t kill_socket = ISC_FALSE;
REQUIRE(socketp != NULL);
sock = *socketp;
@ -448,11 +460,11 @@ isc_socket_detach(isc_socket_t *socketp)
REQUIRE(sock->references > 0);
sock->references--;
if (sock->references == 0)
free_socket = ISC_TRUE;
kill_socket = ISC_TRUE;
UNLOCK(&sock->lock);
if (free_socket)
destroy(sock);
if (kill_socket)
destroy(&sock);
XEXIT("isc_socket_detach");
@ -556,9 +568,6 @@ send_ncdone_event(isc_socket_t sock, isc_socket_ncintev_t *iev,
isc_task_send((*iev)->task, (isc_event_t *)dev);
(*iev)->done = NULL;
if ((*iev)->sock)
free_socket(&(*iev)->sock);
isc_event_free((isc_event_t *)iev);
}
@ -725,7 +734,7 @@ internal_read(isc_task_t task, isc_event_t ev)
* we can.
*/
read_count = dev->region.length - dev->n;
cc = read(sock->fd, dev->region.base + dev->n, read_count);
cc = recv(sock->fd, dev->region.base + dev->n, read_count, 0);
XTRACE(("internal_read: read(%d) %d\n", sock->fd, cc));
@ -801,6 +810,150 @@ internal_read(isc_task_t task, isc_event_t ev)
return (0);
}
static isc_boolean_t
internal_write(isc_task_t task, isc_event_t ev)
{
isc_socket_intev_t iev;
isc_socketevent_t dev;
isc_socket_t sock;
int cc;
size_t write_count;
/*
* Find out what socket this is and lock it.
*/
sock = (isc_socket_t)ev->sender;
LOCK(&sock->lock);
INSIST(sock->pending_write == ISC_TRUE);
sock->pending_write = ISC_FALSE;
XTRACE(("internal_write: sock %p, fd %d\n", sock, sock->fd));
/*
* Pull the first entry off the list, and look at it. If it is
* NULL, or not ours, something bad happened.
*/
iev = HEAD(sock->write_list);
INSIST(iev != NULL);
INSIST(iev->task == task);
/*
* Try to do as much I/O as possible on this socket. There are no
* limits here, currently. If some sort of quantum write count is
* desired before giving up control, make certain to process markers
* regardless of quantum.
*/
do {
iev = HEAD(sock->write_list);
dev = iev->done_ev;
/*
* check for canceled I/O
*/
if (iev->canceled) {
DEQUEUE(sock->write_list, iev, link);
isc_event_free((isc_event_t *)&iev);
continue;
}
/*
* If this is a marker event, post its completion and
* continue the loop.
*/
if (dev->common.type == ISC_SOCKEVENT_SENDMARK) {
send_rwdone_event(sock, &iev, &dev, ISC_R_SUCCESS);
continue;
}
/*
* It must be a write request. Try to satisfy it as best
* we can.
*/
write_count = dev->region.length - dev->n;
if (sock->type == isc_socket_udp)
cc = sendto(sock->fd, dev->region.base + dev->n,
write_count, 0,
(struct sockaddr *)&dev->address,
dev->addrlength);
else
cc = send(sock->fd, dev->region.base + dev->n,
write_count, 0);
XTRACE(("internal_write: send(%d) %d\n", sock->fd, cc));
/*
* check for error or block condition
*/
if (cc < 0) {
if (cc == EWOULDBLOCK)
goto poke;
UNEXPECTED_ERROR(__FILE__, __LINE__,
"internal_write: %s",
strerror(errno));
INSIST(cc >= 0);
}
/*
* write of 0 means the remote end was closed. Run through
* the event queue and dispatch all the events with an EOF
* result code. This will set the EOF flag in markers as
* well, but that's really ok.
*/
if (cc == 0) {
do {
send_rwdone_event(sock, &iev, &dev,
ISC_R_EOF);
iev = HEAD(sock->write_list);
} while (iev != NULL);
goto poke;
}
/*
* if we write less than we expected, update counters,
* poke.
*/
if ((size_t)cc < write_count) {
dev->n += cc;
/*
* If partial writes are allowed, we return whatever
* was read with a success result, and continue
* the loop.
*/
if (iev->partial) {
send_rwdone_event(sock, &iev, &dev,
ISC_R_SUCCESS);
continue;
}
/*
* Partials not ok. Exit the loop and notify the
* watcher to wait for more writes
*/
goto poke;
}
/*
* Exactly what we wanted to read. We're done with this
* entry. Post its completion event.
*/
if ((size_t)cc == write_count) {
dev->n += write_count;
send_rwdone_event(sock, &iev, &dev, ISC_R_SUCCESS);
}
} while (!EMPTY(sock->write_list));
poke:
if (!EMPTY(sock->write_list))
select_poke(sock->manager, sock->fd);
UNLOCK(&sock->lock);
return (0);
}
/*
* This is the thread that will loop forever, always in a select or poll
* call.
@ -833,12 +986,12 @@ watcher(void *uap)
done = ISC_FALSE;
while (!done) {
readfds = manager->read_fds;
writefds = manager->write_fds;
UNLOCK(&manager->lock);
do {
readfds = manager->read_fds;
writefds = manager->write_fds;
UNLOCK(&manager->lock);
cc = select(FD_SETSIZE, &readfds, &writefds, NULL,
NULL);
if (cc < 0) {
@ -847,9 +1000,10 @@ watcher(void *uap)
"select failed: %s",
strerror(errno));
}
LOCK(&manager->lock);
} while (cc < 0);
LOCK(&manager->lock);
XTRACE(("watcher got manager lock\n"));
@ -1089,6 +1243,7 @@ isc_socket_recv(isc_socket_t sock, isc_region_t region,
isc_socket_intev_t iev;
isc_socketmgr_t manager;
isc_task_t ntask = NULL;
int cc;
manager = sock->manager;
@ -1121,6 +1276,12 @@ isc_socket_recv(isc_socket_t sock, isc_region_t region,
sock->references++; /* attach to socket in cheap way */
/*
* UDP sockets are always partial read
*/
if (sock->type == isc_socket_udp)
partial = ISC_TRUE;
/*
* Remember that we need to detach on event free
*/
@ -1132,22 +1293,59 @@ isc_socket_recv(isc_socket_t sock, isc_region_t region,
/*
* If the read queue is empty, try to do the I/O right now.
*/
#if 0
if (EMPTY(sock->read_list)) {
cc = recv(...);
if (sock->type == isc_socket_udp) {
cc = recvfrom(sock->fd, ev->region.base,
ev->region.length, 0,
(struct sockaddr *)&ev->address,
&ev->addrlength);
} else {
cc = recv(sock->fd, ev->region.base,
ev->region.length, 0);
ev->address = sock->address;
ev->addrlength = sock->addrlength;
}
if (cc < 0) {
if (cc == EWOULDBLOCK)
goto queue;
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_socket_recv: %s",
strerror(errno));
INSIST(cc >= 0);
}
if (cc == 0) {
ev->result = ISC_R_EOF;
isc_task_send(task, (isc_event_t *)&ev);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
ev->n = cc;
/*
* Partial reads need to be queued
*/
if ((size_t)cc != ev->region.length && !partial)
goto queue;
/*
* full reads are posted, or partials if partials are ok.
*/
isc_task_send(task, (isc_event_t *)&ev);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
#endif
/*
* We couldn't read all or part of the request right now, so queue
* it.
*/
queue:
iev = sock->riev;
sock->riev = NULL;
@ -1176,6 +1374,161 @@ isc_socket_recv(isc_socket_t sock, isc_region_t region,
return (ISC_R_SUCCESS);
}
isc_result_t
isc_socket_send(isc_socket_t sock, isc_region_t region,
isc_task_t task, isc_taskaction_t action, void *arg)
{
return isc_socket_sendto(sock, region, task, action, arg, NULL, 0);
}
isc_result_t
isc_socket_sendto(isc_socket_t sock, isc_region_t region,
isc_task_t task, isc_taskaction_t action, void *arg,
isc_sockaddr_t address, int addrlength)
{
isc_socketevent_t ev;
isc_socket_intev_t iev;
isc_socketmgr_t manager;
isc_task_t ntask = NULL;
int cc;
manager = sock->manager;
ev = (isc_socketevent_t)isc_event_allocate(manager->mctx, sock,
ISC_SOCKEVENT_SENDDONE,
action, arg, sizeof(*ev));
if (ev == NULL)
return (ISC_R_NOMEMORY);
LOCK(&sock->lock);
if (sock->wiev == NULL) {
iev = (isc_socket_intev_t)isc_event_allocate(manager->mctx,
sock,
ISC_SOCKEVENT_INTIO,
internal_write,
sock,
sizeof(*iev));
if (iev == NULL) {
/* no special free routine yet */
isc_event_free((isc_event_t *)&ev);
return (ISC_R_NOMEMORY);
}
INIT_LINK(iev, link);
sock->wiev = iev;
iev = NULL; /* just in case */
}
sock->references++; /* attach to socket in cheap way */
/*
* Remember that we need to detach on event free
*/
ev->common.destroy = rwdone_event_destroy;
ev->region = *region;
ev->n = 0;
/*
* If the write queue is empty, try to do the I/O right now.
*/
if (sock->type == isc_socket_udp) {
INSIST(addrlength > 0 || sock->addrlength > 0);
if (addrlength > 0) {
ev->address = *address;
ev->addrlength = addrlength;
} else if (sock->addrlength > 0) {
ev->address = sock->address;
ev->addrlength = sock->addrlength;
}
} else if (sock->type == isc_socket_tcp) {
INSIST(address == NULL);
INSIST(addrlength == 0);
ev->address = sock->address;
ev->addrlength = sock->addrlength;
}
if (EMPTY(sock->write_list)) {
if (sock->type == isc_socket_udp)
cc = sendto(sock->fd, ev->region.base,
ev->region.length, 0,
(struct sockaddr *)&ev->address,
ev->addrlength);
else if (sock->type == isc_socket_tcp)
cc = send(sock->fd, ev->region.base,
ev->region.length, 0);
else
cc = -1; /* XXX */
if (cc < 0) {
if (cc == EWOULDBLOCK)
goto queue;
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_socket_send: %s",
strerror(errno));
INSIST(cc >= 0);
}
if (cc == 0) {
ev->result = ISC_R_EOF;
isc_task_send(task, (isc_event_t *)&ev);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
ev->n = cc;
/*
* Partial writes need to be queued
*/
if ((size_t)cc != ev->region.length)
goto queue;
/*
* full writes are posted.
*/
isc_task_send(task, (isc_event_t *)&ev);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
/*
* We couldn't send all or part of the request right now, so queue
* it.
*/
queue:
iev = sock->wiev;
sock->wiev = NULL;
isc_task_attach(task, &ntask);
iev->done_ev = ev;
iev->task = ntask;
iev->partial = ISC_FALSE; /* doesn't matter */
/*
* Enqueue the request. If the socket was previously not being
* watched, poke the watcher to start paying attention to it.
*/
if (EMPTY(sock->write_list)) {
ENQUEUE(sock->write_list, iev, link);
select_poke(sock->manager, sock->fd);
} else {
ENQUEUE(sock->write_list, iev, link);
}
XTRACE(("isc_socket_send: posted ievent %p, dev %p, task %p\n",
iev, iev->done_ev, task));
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
}
isc_result_t
isc_socket_bind(isc_socket_t sock, struct isc_sockaddr *sockaddr,
int addrlen)
@ -1242,6 +1595,9 @@ isc_socket_listen(isc_socket_t sock, int backlog)
return (ISC_R_UNEXPECTED);
}
if (backlog == 0)
backlog = SOMAXCONN;
if (listen(sock->fd, backlog) < 0) {
UNLOCK(&sock->lock);
UNEXPECTED_ERROR(__FILE__, __LINE__, "listen: %s",