/***************************************************************************** * * NCSA DTM version 2.3 * May 1, 1992 * * NCSA DTM Version 2.3 source code and documentation are in the public * domain. Specifically, we give to the public domain all rights for future * licensing of the source code, all resale rights, and all publishing rights. * * We ask, but do not require, that the following message be included in all * derived works: * * Portions developed at the National Center for Supercomputing Applications at * the University of Illinois at Urbana-Champaign. * * THE UNIVERSITY OF ILLINOIS GIVES NO WARRANTY, EXPRESSED OR IMPLIED, FOR THE * SOFTWARE AND/OR DOCUMENTATION PROVIDED, INCLUDING, WITHOUT LIMITATION, * WARRANTY OF MERCHANTABILITY AND WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE * *****************************************************************************/ /*************************************************************************** ** ** dtm.c Contains the user routines for the DTM library. These ** routines provide inter-machine message passing for IPC. ** Some of the features include: ** - automatic type conversion ** - synchronous and asynchronous operation ** - flexible message format, which is user configurable ** - high-level message types (SDS, RIS, SDL) ** ***************************************************************************/ /* * $Log: dtm.c,v $ * Revision 1.5 1996/06/06 19:47:32 spowers * Linux is brain ded. * * Revision 1.4 1996/02/18 23:40:10 spowers * PROTO -> DTM_PROTO * * Revision 1.3 1995/10/14 22:07:23 spowers * Bzero and Bcopy removed...memset memcpy used instead. * * Revision 1.2 1995/10/13 06:33:05 spowers * Solaris support added. * * Revision 1.1.1.1 1995/01/11 00:02:58 alanb * New CVS source tree, Mosaic 2.5 beta 4 * * Revision 2.5 1994/12/29 23:39:36 alanb * I'm committing with a new symbolic revision number. * * Revision 1.1.1.1 1994/12/28 21:37:30 alanb * * Revision 1.2 1993/10/29 03:46:46 marca * Tweaks. * * Revision 1.1.1.1 1993/07/04 00:03:10 marca * Mosaic for X version 2 distribution * * Revision 1.1 1993/01/18 21:50:13 marca * I think I got it now. * * Revision 1.46 92/05/05 22:27:50 jplevyak * Corrected X interface code. * * Revision 1.45 1992/04/30 20:25:27 jplevyak * Changed Version to 2.3. * * Revision 1.44 1992/04/29 21:55:22 jplevyak * Add new function DTMgetConnectionCount. Add support for DTMaddInput. * * Revision 1.43 1992/03/20 21:14:40 jplevyak * Logical outports are now deleted from the list when the connection * is dropped instead of trying to reconnect. * * Revision 1.42 1992/03/19 00:21:31 jplevyak * Use disconnect_out_port instead of just dropping the connection in * availWrite. Fix Fix. * * Revision 1.41 92/03/18 22:25:44 jplevyak * Fix DTMEOF bug with absolute addressed. * * Revision 1.40 1992/03/16 20:38:36 creiman * Added #include "arch.h" * * Revision 1.39 1992/03/16 17:30:07 jplevyak * Fix selectRead to work with outports. * * Revision 1.38 1992/03/10 22:07:10 jplevyak * Added changed for PC/MAC from Quincey Koziol (koziol@ncsa.uiuc.edu) * with modification. * * Revision 1.37 1992/03/02 18:29:41 jplevyak * Fixed bug in EAGAIN handling. * * Revision 1.36 1992/03/02 17:20:14 jplevyak * Temporary back out. * * Revision 1.34 1992/02/28 03:40:24 jplevyak * Enhancement for ASYNC. int/long conflict (makes no difference on workstations). * * Revision 1.33 1992/02/27 23:41:46 jplevyak * Ensure that the seqstart flag is always set when the seq start * message is sent. Return an error if endWrite is called before * begin write. * Fix bug in writeMsg. * * Revision 1.32 1992/02/18 21:33:53 jefft * DTMerrno was not being cleared at the start of DTMavailRead. Now it is. * * Revision 1.31 1992/02/18 14:01:39 jplevyak * Added DTMaddInPortSocket. Use _ARCH_MACOS for macintosh. * * Revision 1.30 1992/01/30 19:27:01 jplevyak * Don't write ack to dropped connections. Use timeout of -1 as * an infinite wait to DTMselectRead. * * Revision 1.29 92/01/25 14:44:04 jplevyak * Fixed minor bug with DTM_ASYNC and another in AvailWrite * * Revision 1.28 1992/01/14 19:40:09 creiman * #ifndef macintosh for accept_read_connections call to dtm_sigio * * Revision 1.27 1992/01/14 19:35:47 creiman * Removed malloc.h from mac version * Made clear_write_flags return DTMNOERR * * Revision 1.26 1992/01/14 16:31:40 creiman * Removed mac #include * * Revision 1.25 1992/01/02 21:06:13 jefft * Whoops. Checked-in with a bug in DTMcheckRoute. Fixed it! * * Revision 1.24 1992/01/02 20:43:55 jefft * Modified DTMcheckRoute when used with absolute port address. It now * returns TRUE the first time called and FALSE there after. * * Revision 1.23 1991/12/09 18:36:18 jplevyak * Added support for Callback ( DTMreadReady ). * * Revision 1.22 1991/11/22 21:30:06 jplevyak * Added handling for fDiscard flag which allows the server to * tell a writer to send the data to /dev/null * * Revision 1.21 1991/11/15 19:42:34 jplevyak * Fixed Writing so that that connections are dropped silently on EOF. * * Revision 1.20 1991/11/01 23:07:55 jplevyak * New version of DTMselectRead. * * Revision 1.19 1991/11/01 22:42:10 jplevyak * Testing a new DTMselectRead. * * Revision 1.18 1991/11/01 21:11:30 jplevyak * More fixes for physical ports. * * Revision 1.17 1991/11/01 20:42:14 jplevyak * Fixed bug with physical addressing. * * Revision 1.16 1991/10/29 22:05:15 sreedhar * #include added * * Revision 1.15 1991/10/29 16:37:41 jplevyak * Made DTMreadSelect retry when it gets a new connnection. * * Revision 1.14 1991/10/15 18:19:55 jplevyak * Reorder some functions to make the SGIs happy. * * Revision 1.13 91/10/14 16:46:18 jplevyak * Added loop for detecting dropped connections during beginRead. * * Revision 1.12 1991/10/11 20:21:50 jplevyak * Fixed bug with multiple senders one receiver. * Added function DTMcheckRoute. * * Revision 1.11 1991/10/10 14:22:36 jplevyak * Complete major rework. Complete testing when the test * suite is done. * * Revision 1.10 91/09/26 20:22:43 jplevyak * First stage of reorganization. Use external/internal port mapping. * Use repackaged dtm_get_nlist and select_one. Fix bugs with availWrite. * * Revision 1.9 1991/09/13 17:34:10 sreedhar * changes for DTMSYNC, DTMNOSYNC quality of service * * Revision 1.7 1991/08/15 18:57:06 sreedhar * Changes for logical portname version * * Revision 1.5 1991/06/11 15:19:04 sreedhar * WRITEMSG option - sequence start, end of message also put in same writev call. * * Revision 1.4 1991/06/07 15:58:10 sreedhar * Changes for "Sequence start" message * * Revision 1.3 1991/05/30 15:51:31 sreedhar * Changes for readMsg/writeMsg internal release * * Revision 1.2 1990/12/19 17:28:50 jefft * DTMerrno is now cleared at the beginning of each DTM routine. This * allows application to continue when possible. * * Revision 1.1 90/11/08 16:12:20 jefft * Initial revision * */ /* Overview of DTM message transfer protocol. 1. TCP connection is established between sender and receiver. 2. Sender sends the "sequence starts" message. 3. Sender awaits "ack from seq start" from the receiver. Receipt of ack by sender guarantees the sender that receiver will definitely accept at least the first user message sent by the sender. Sender can then send as many user messages as it wants to ( they would be accepted by receiver ). 4. Sender sends the user's header message and user data messages. 3. Receiver will keep accepting user messages on current connection unless a new connection request is received, which would be accepted after bumping the current connection. 4. Sender would send "Messages over" message after it sends all user messages. Receiver would accept same. Graphic picture Sender Receiver Connect request --------------> Sequence starts | --------------> | | Ack for seq start | <---------------- | | User header | --------------> | | --> a sequence of | BEGINWRITE, | WRITEDATASET, User message 1 | ENDWRITE --------------> | ............... | or equivalently, ............... | WRITEMSG | User message n | --------------> | | Messages over | --------------> | | ............... More sequences as above ............... A "sequence starts" message can be sent in availWrite or beginWrite. When no "Ack for header" is received or a write fails. Note that the "ack for header", "message over" and "sequence starts" messages are called called "ack" in DTM terminology ( send_ack, recv_ack calls used for all these ). */ #include #include #include #include "arch.h" #if defined(_ARCH_MSDOS) #include #include #include "uio.h" #else #include #include #include #include #endif #ifdef SOLARIS #include #endif #ifdef RS6000 #include #endif #ifndef _ARCH_MACOS # if defined( _ARCH_CONVEX ) || defined( NEXT ) # include # else # include # endif #endif #define MAIN #include "dtm.h" #include "dtmint.h" #include "debug.h" #define DTM_IOV_WRITE_SIZE 6 typedef struct { struct iovec iovec[ DTM_IOV_WRITE_SIZE ]; int32 iovsize; int32 iovlen; int32 rts_data; int32 hdr_size; int32 data_size; int32 end_data; } IOV_BUF; unsigned int uDTMdbg; #ifdef DTM_PROTOTYPES /* STATIC FUNCTION PROTOTYPES */ static int destroy_out_port DTM_PROTO(( DTMPORT *pp, Outport **)); static int select_one_connection DTM_PROTO(( int )); static Inport * new_in_port DTM_PROTO(( DTMPORT *pp,int fd )); static int32 select_one DTM_PROTO(( int connfd )); static Inport * inc_in_port DTM_PROTO(( DTMPORT *pp, Inport *inp )); static void inc_nextToRead DTM_PROTO(( DTMPORT *pp )); static int send_cts DTM_PROTO(( DTMPORT *pp, int fWait )); static int accept_one_header DTM_PROTO(( DTMPORT *pp, void *header, int size )); static int make_out_connections DTM_PROTO(( DTMPORT *pp )); static void make_write_iov DTM_PROTO(( IOV_BUF *iov, int fStartSeq, int fEndSeq, char *hdr, int hdrsize, VOIDPTR data, int datasize )); static int outp_count DTM_PROTO(( DTMPORT *pp )); static int writev_buffer DTM_PROTO(( DTMPORT *pp, IOV_BUF *iov_buf, int fStartSeq )); static int check_header_write_ack DTM_PROTO(( DTMPORT *pp )); static int verify_out_connections DTM_PROTO(( DTMPORT *pp )); static int clear_write_flags DTM_PROTO(( DTMPORT *pp )); #endif /* destroy_out_port() destroy an out port, including status flags. */ #ifdef DTM_PROTOTYPES static int destroy_out_port( DTMPORT *pp, Outport **ppcur ) #else static int destroy_out_port( pp, ppcur ) DTMPORT * pp; Outport ** ppcur; #endif { static Outport dummyForLoops; Outport * pcur = *ppcur; /* What should we do with a close error here? Send to the server...? -john */ (void) dtm_end_connect( pcur->connfd ); pcur->seqstart = FALSE ; pcur->availwrite = FALSE ; pcur->connfd = DTM_NO_CONNECTION ; /* Delete the port from the list */ if ( pp->out == pcur ) pp->out = pcur->next; else { Outport * outp; FOR_EACH_OUT_PORT( outp, pp ) { if ( outp->next == pcur ) { outp->next = pcur->next; break; } } } /* Most (if not all) of the users for this function will have it embedded in a FOR_EACH_OUT_PORT loop. In order that this loop is not endangered, the loop counter is temporarily redirected to dummyForLoops (a copy of the outport. */ dummyForLoops = *pcur; *ppcur = &dummyForLoops; free( pcur ); if ( !pp->fLogical ) { DTMerrno = DTMEOF; return DTMERROR; } return DTM_OK; } /* outp_count() Returns the number of outports attached to the given DTM port */ #ifdef DTM_PROTOTYPES static int outp_count(DTMPORT *pp ) #else static int outp_count( pp ) DTMPORT *pp; #endif { int i = 0; Outport * inp; FOR_EACH_OUT_PORT( inp, pp ) { i++; } return i; } /* make_out_connections() For each new out port, attmpt to make the connection. */ #ifdef DTM_PROTOTYPES static int make_out_connections(DTMPORT *pp ) #else static int make_out_connections( pp ) DTMPORT *pp; #endif { Outport * pcur; FOR_EACH_OUT_PORT( pcur, pp ) { if( pcur->connfd == DTM_NO_CONNECTION) { if( dtm_connect( &pcur->sockaddr, &pcur->connfd ) == DTMERROR ) { DBGFLOW( "make_out_connections: dtm_connect fails \n" ); return DTMERROR; } } } return DTM_OK; } /* clear_write_flags() Reset the sequence flags on write ports. */ #ifdef DTM_PROTOTYPES static int clear_write_flags(DTMPORT *pp ) #else static int clear_write_flags( pp ) DTMPORT *pp; #endif { Outport * pcur; FOR_EACH_OUT_PORT( pcur, pp ) { pcur->seqstart = FALSE ; pcur->availwrite = FALSE ; } return DTMNOERR; } /* verify_out_connections() Ensure that their is a connection on each out port. */ #ifdef DTM_PROTOTYPES static int verify_out_connections(DTMPORT *pp ) #else static int verify_out_connections( pp ) DTMPORT *pp; #endif { Outport * pcur; FOR_EACH_OUT_PORT( pcur, pp ) { if( pcur->connfd == DTM_NO_CONNECTION) { DTMerrno = DTMPORTINIT; return DTMERROR; } } return DTM_OK; } /* check_header_write_ack() !!!! Check to see whether a header write acknowledge is required, or has come and read it. Since there are no more header ack, this function clears the CTS. */ #ifdef DTM_PROTOTYPES static int check_header_write_ack(DTMPORT *pp ) #else static int check_header_write_ack( pp ) DTMPORT *pp; #endif { Outport * pcur; FOR_EACH_OUT_PORT( pcur, pp ) { if ( pcur->connfd == DTM_NO_CONNECTION ) continue; if( !( pcur->availwrite ) ) { int32 tmp; if( (pp->qservice == DTM_SYNC) || ((pp->qservice == DTM_ASYNC) && (dtm_select( pcur->connfd, &tmp, 0 ) == TRUE && tmp >= 4))) { do { int temp; if( dtm_recv_ack( pcur->connfd, &temp ) == DTMERROR) { DBGFLOW( "Incorrect ack for header\n" ); CHECK_ERR( destroy_out_port( pp, &pcur )); /* Do we return an error here? Do we unlink the port? return DTMERROR ; */ } } while ( dtm_select( pcur->connfd, &tmp, 0 ) == TRUE && tmp >= 4); pcur->availwrite = TRUE; } } } return DTM_OK; } /* make_write_iov() returns: The iov[0] = start seq, iov[1,2] = headersize, header iov[3,4] = datasetsize, dataset iov[5] = end seq */ #define START_SEQ TRUE #define NO_START_SEQ FALSE #define END_SEQ TRUE #define NO_END_SEQ FALSE #ifdef DTM_PROTOTYPES static void make_write_iov(IOV_BUF *iov,int fStartSeq,int fEndSeq,char *hdr, int hdrsize,VOIDPTR data,int datasize ) #else static void make_write_iov( iov, fStartSeq, fEndSeq, hdr, hdrsize, data, datasize ) IOV_BUF *iov; int fStartSeq; int fEndSeq; char *hdr ; int hdrsize ; VOIDPTR data ; int datasize ; #endif { int status ; int i ; /* Send "Sequence starts" message */ i = 0 ; iov->iovsize = 0; if ( fStartSeq ) { DBGMSG( "make_write_iov: making start seq\n" ); iov->rts_data = DTM_RTS; STDINT( iov->rts_data ); iov->iovec[ i ].iov_base = (char *)&iov->rts_data ; #define SEQ_START_LEN 4 iov->iovec[ i ].iov_len = SEQ_START_LEN; i += 1 ; iov->iovsize += 4; } /* Prepare header size and header */ if ( hdrsize != 0 ) { DBGMSG( "make_write_iov: making header\n" ); iov->hdr_size = hdrsize ; STDINT( iov->hdr_size ); iov->iovec[ i ].iov_base = (char *)&iov->hdr_size ; iov->iovec[ i ].iov_len = 4 ; i += 1 ; iov->iovec[ i ].iov_base = hdr ; iov->iovec[ i ].iov_len = hdrsize ; i += 1 ; iov->iovsize += 4 + hdrsize; } /* Prepare data size and data */ if ( datasize != 0 ) { DBGMSG( "make_write_iov: making dataset\n" ); iov->data_size = datasize ; STDINT( iov->data_size ); iov->iovec[ i ].iov_base = (char *)&iov->data_size ; iov->iovec[ i ].iov_len = 4 ; i += 1 ; iov->iovec[ i ].iov_base = (char *)data ; iov->iovec[ i ].iov_len = datasize ; i += 1 ; iov->iovsize += 4 + datasize; } if ( fEndSeq ) { /* End sequence */ DBGMSG( "make_write_iov: making endseq\n" ); iov->end_data = DTM_EOT ; STDINT( iov->end_data ); iov->iovec[ i ].iov_base = (char *)&iov->end_data ; iov->iovec[ i ].iov_len = 4 ; i += 1 ; iov->iovsize += 4; } iov->iovlen = i; } #ifdef DTM_PROTOTYPES static int writev_buffer(DTMPORT *pp,IOV_BUF *iov_buf,int fStartSeq ) #else static int writev_buffer( pp, iov_buf, fStartSeq ) DTMPORT * pp; IOV_BUF * iov_buf; int fStartSeq; #endif { Outport * pcur; struct iovec * iov; int32 iovlen; int32 iovsize; FOR_EACH_OUT_PORT( pcur, pp ) { int status; if ( pcur->connfd == DTM_NO_CONNECTION ) continue; iov = &iov_buf->iovec[0]; iovlen = iov_buf->iovlen; iovsize = iov_buf->iovsize; if ( fStartSeq ) { if (pcur->availwrite || pcur->seqstart ) { DBGMSG1( "writev: dropping start seq = %x\n", (pcur->availwrite?1:0) | (pcur->seqstart?10:0)); /* we have already sent the sequence start, skip it */ iov++; iovsize -= SEQ_START_LEN; iovlen -= 1; } pcur->seqstart = TRUE; } DBGMSG1( "writev_buffer: iovlen = %d\n", iovlen ); DBGMSG1( "writev_buffer: iovsize = %d\n", iovsize ); DBGMSG1( "writev_buffer: ptr iov = %X\n", iov ); DBGMSG1( "writev_buffer: first ptr word = %X\n", iov[0].iov_base ); DBGMSG1( "writev_buffer: first word = %d\n", *(int *)((iov[0]).iov_base)); status = dtm_writev_buffer( pcur->connfd, iov, iovlen, iovsize, NULL, 0); DBGINT( "writev_buffer - status = %d\n", status); if( status < 0 ) { DBGINT( "dtm_writev_buffer - errno = %d\n", errno ); if( DTMerrno == DTMEOF ) { CHECK_ERR( destroy_out_port( pp, &pcur )); continue; } /* What do we do with the other errors? It is unclear whether we want to bail or not -john return DTMERROR; */ } } /* end while */ return DTM_OK; } /* select_one() Select on a single socket. */ #ifdef DTM_PROTOTYPES static int32 select_one(int connfd ) #else static int32 select_one( connfd ) int connfd; #endif { static struct timeval timeout = {0L, 0L}; fd_set readmask; int32 ret; FD_ZERO( &readmask ); FD_SET( connfd, &readmask ); #ifdef __hpux ret = select( FD_SETSIZE, (int *)&readmask, (int *)0, (int *)0, #else ret = select( FD_SETSIZE, &readmask, (fd_set *)0, (fd_set *)0, #endif &timeout ); if ( ret > 0 ) { int32 count; ioctl( connfd, FIONREAD, &count ); DBGMSG1( "select_one: got count = %d\n", count ); ret = count; } return ret; } /* select_one() Select on a single socket. */ #ifdef DTM_PROTOTYPES static int select_one_connection(int connfd ) #else static int select_one_connection( connfd ) int connfd; #endif { static struct timeval timeout = {0L, 0L}; fd_set readmask; int ret; FD_ZERO( &readmask ); FD_SET( connfd, &readmask ); #ifdef __hpux return select( FD_SETSIZE, (int *)&readmask, (int *)0, (int *)0, #else return select( FD_SETSIZE, &readmask, (fd_set *)0, (fd_set *)0, #endif &timeout ); } /* new_in_ports() Add a new inport on a DTMPORT. */ #ifdef DTM_PROTOTYPES static Inport * new_in_port(DTMPORT *pp,int fd ) #else static Inport * new_in_port( pp, fd ) DTMPORT * pp; int fd; #endif { Inport * inp; if ( (inp = (Inport *) malloc( sizeof(Inport) )) == NULL ) { DTMerrno = DTMMEM; return (Inport *) DTMERROR; } memset(inp,0,sizeof(Inport)); /* #ifdef SOLARIS memset(inp,0,sizeof(Inport)); #else bzero( inp, sizeof(Inport) ); #endif */ inp->fd = fd; inp->blocklen = DTM_NEW_DATASET; inp->fCTSsent = FALSE; #define PUT_NEW_IN_PORTS_AT_END #ifdef PUT_NEW_IN_PORTS_AT_END { Inport * endp; endp = pp->in; if ( endp == NULL ) pp->in = inp; else { while ( endp->next != NULL ) endp = endp->next; endp->next = inp; } } #else inp->next = pp->in; pp->in = inp; #endif return inp; } #ifdef DTM_PROTOTYPES void dtm_handle_in( void * client_data, int * fd, void * id) #else void dtm_handle_in( client_data, fd, id ) void * client_data; int * fd; void * id; #endif { int p = (int) client_data; DTMPORT * pp = DTMpt[p]; int p_ext = p; dtm_map_port_external( &p_ext ); if ( DTMavailRead( p_ext ) == DTM_PORT_READY ) pp->Xcallback( pp->Xcallback_data, &p_ext, id ); } /* dtm_set_Xcallback This function may seem a little strange, after all why have a variable (pp->XaddInput) which has only one valid value (XtAddInput). The problem is that we don't want to include the X libraries unless we have to. By using this variable which is only set if the function that will cause this function to get called is included... which causes the inclusion of the X libraries, we avoid the undefined external error. */ /* Change this if the test in x.c fails! */ #define XtInputReadMask (1L<<0) #ifdef DTM_PROTOTYPES void dtm_set_Xcallback( DTMPORT * pp, Inport * inp ) #else void dtm_set_Xcallback( pp, inp ) DTMPORT * pp; Inport * inp; #endif { /* you didn't see this */ int p; for ( p = 0; p < DTMptCount ; p++ ) if ( pp == DTMpt[p] ) break; if ( pp->porttype == INPORTTYPE && pp->XaddInput ) { inp->XinputId = pp->XaddInput( inp->fd, XtInputReadMask, dtm_handle_in, (void *) p); } } /* dtm_accept_read_connections() If fWait is TRUE: then if there are no connections wait for 2 minutes for a connection before failing. If there are some connections, add any new ones without waiting. If fWait is FALSE: Add any new connections without waiting. */ #ifdef DTM_PROTOTYPES int dtm_accept_read_connections(DTMPORT *pp,int fWait ) #else int dtm_accept_read_connections( pp, fWait ) DTMPORT * pp; int fWait; #endif { struct timeval timeout ; reg int fd; while ( TRUE ) { Inport * inp; fWait = fWait && (pp->in == NULL); /* Do we have any waiting or will be wait? */ if ( !fWait && (select_one_connection( pp->sockfd ) < 1 )) break; /* Wait only 2 minutes for the first connection request */ timeout.tv_sec = 120 ; timeout.tv_usec = 0 ; /* No connection yet, await one and accept */ DBGINT( "dtm_accept_read_connection: pp -> sockfd = %d\n", pp -> sockfd ); if( (fd = dtm_accept( pp->sockfd, &pp->sockaddr, fWait ? &timeout : 0)) == DTMERROR ) { if ( !fWait ) return DTM_OK; DTMerrno = DTMTIMEOUT ; return DTMERROR; } DBGINT( "dtm_accept_read_connection: got fd = %d\n", fd ); CHECK_ERR( inp = new_in_port( pp, fd )); #ifndef _ARCH_MACOS if ( pp->callback ) dtm_sigio( fd ); if ( pp->Xcallback ) dtm_set_Xcallback( pp, inp ); #endif } /* If we have accepted new read connections, reset the nextToRead pointer. */ pp->nextToRead = pp->in; return DTM_OK; } /* DTMselectRead() Function to test a) for existence of a new connection or a new message header to be read on a set of DTM ports OR b) for whether data is available to be read on a set of sockets. Return values : DTM_PORT_READY if at least a DTM port or socket has something to be read. DTM_PORT_NOT_READY if no DTM port or socket has something to be read. DTMERROR if select system call returns error. Each port has status field. Possible values for status field are - DTM_PORT_READY something available to be read. DTM_PORT_NOT_READY nothing available to be read. DTMERROR port not initialised. */ #ifdef DTM_PROTOTYPES int DTMselectRead( Dtm_set *dtmset,int dtmnum,Sock_set *sockset,int socknum, int period ) #else int DTMselectRead( dtmset, dtmnum, sockset, socknum, period ) Dtm_set *dtmset ; /* Pointer to set of DTM ports */ int dtmnum ; /* Number of DTM ports to be checked for */ Sock_set *sockset ; /* Pointer to set of sockets */ int socknum ; /* Number of sockets to be checked for */ int period ; #endif { fd_set readmask ; fd_set *fchk = &readmask ; int nf ; int index ; int fReady; Dtm_set *p1 ; Sock_set *p2 ; struct timeval timeout ; int fNewConnections; int fFalsePositive; DBGFLOW( "DTMselectRead called\n" ); timeout.tv_sec = period/1000 ; timeout.tv_usec = (period - (period/1000)*1000)*1000 ; do { fNewConnections = FALSE; fFalsePositive = FALSE; fReady = DTM_PORT_NOT_READY; FD_ZERO( fchk ); /* Set up DTM ports to be selected on */ for( p1 = dtmset, index = 0 ; index < dtmnum ; index++, p1++ ) { reg DTMPORT *pp ; int port_internal; reg Inport *inp; /* Select status is error if port entry is not initialised. */ if( (port_internal = dtm_map_port_internal( p1->port )) == DTMERROR ) { p1->status = DTMERROR ; continue ; } pp = DTMpt[ port_internal ]; CHECK_ERR( dtm_accept_read_connections( pp, DTM_DONT_WAIT)); /* look for new connection request */ FD_SET( pp -> sockfd, fchk ); /* look for data in existing connection (if it exists) */ FOR_EACH_IN_PORT( inp, pp ) { FD_SET( inp->fd, fchk ); } p1->status = DTM_PORT_NOT_READY ; } /* Set up socket fds to be selected on */ for( p2 = sockset, index = 0 ; index < socknum ; index++, p2++ ) { FD_SET( p2 -> sockfd, fchk ); p2 -> status = DTM_PORT_NOT_READY ; } #ifdef __hpux nf = select( FD_SETSIZE, (int *)fchk, (int *)0, (int *)0, #else nf = select( FD_SETSIZE, fchk, (fd_set *)0, (fd_set *)0, #endif period < 0 ? NULL : &timeout ); /* Select returns error */ if( nf < 0 ) { DBGINT( "DTMselectRead: select error %d \n", errno ); DTMerrno = DTMSELECT ; return DTMERROR ; } /* None of the DTM ports or sockets have anything to be read */ if( nf == 0 ) { DBGFLOW( "DTMselectRead: Nothing to read\n" ); return DTM_PORT_NOT_READY ; } /* Check whether any DTM port has something to be read */ for( p1 = dtmset, index = 0 ; index < dtmnum ; index++, p1++ ) { reg DTMPORT *pp ; auto int port_internal; reg Inport *inp; if ((port_internal= dtm_map_port_internal( p1->port )) == DTMERROR) continue; pp = DTMpt[ port_internal ]; if (pp->porttype == INPORTTYPE) { fNewConnections = fNewConnections || (select_one_connection( pp->sockfd ) > 0); p1->status = DTM_PORT_NOT_READY; } else { if (select_one_connection( pp->sockfd ) > 0) fReady = p1->status = DTM_PORT_READY; else p1->status = DTM_PORT_NOT_READY; continue; } inp = pp->in; while ( inp != NULL ) { if ( FD_ISSET( inp->fd, fchk )) { if ( select_one( inp->fd ) < 1 ) { dtm_destroy_in_port( inp, pp ); fFalsePositive = TRUE; inp = pp->in; continue; } p1->status = DTM_PORT_READY; } inp = inp->next; } if ( p1->status == DTM_PORT_READY ) fReady = DTM_PORT_READY; } /* Check whether any socket has something to be read */ for( p2 = sockset, index = 0 ; index < socknum ; index++, p2++ ) { p2 -> status = FD_ISSET( p2 -> sockfd, fchk ) ? DTM_PORT_READY : DTM_PORT_NOT_READY ; if ( p2->status == DTM_PORT_READY ) fReady = DTM_PORT_READY; } DBGFLOW( "DTMselectRead done loop\n" ); } while (!fReady && (fNewConnections || fFalsePositive)); return fReady ; } #ifdef DTM_PROTOTYPES static Inport * inc_in_port(DTMPORT *pp,Inport *inp ) #else static Inport * inc_in_port( pp, inp ) DTMPORT * pp; Inport * inp; #endif { if ( inp == NULL || inp->next == NULL ) return pp->in; else return inp->next; } #ifdef DTM_PROTOTYPES static void inc_nextToRead(DTMPORT *pp ) #else static void inc_nextToRead( pp ) DTMPORT * pp; #endif { pp->nextToRead = inc_in_port( pp, pp->nextToRead ); } /* dtm_destory_in_port() Close, unlink and delete an inport. */ int dtm_destroy_in_port( inp, pp ) Inport * inp; DTMPORT * pp; { DBGMSG1( "dtm_destroy_in_port on %d\n", inp->fd ); if ( pp->Xcallback ) pp->XremoveInput( inp->XinputId ); close( inp->fd ); if ( pp->nextToRead == inp ) inc_nextToRead( pp ); if ( pp->nextToRead == inp ) pp->nextToRead = NULL; if ( pp->in == inp ) pp->in = inp->next; else { Inport * inpTemp; FOR_EACH_IN_PORT( inpTemp, pp ) { if ( inpTemp->next == inp ) { inpTemp->next = inp->next; break; } } } #ifdef FREE_RETURNS_INT if ( free( inp ) != 1 ) { DBGMSG( "dtm_destroy_in_port free error\n" ); DTMerrno = DTMCORPT; return DTMERROR; } #else free( inp ); #endif DBGMSG( "dtm_destroy_in_port done\n" ); return DTM_OK; } /* send_cts() Send CTS to the next port depending on the option DTMSendRTSAhead. */ #ifdef DTM_PROTOTYPES static int send_cts(DTMPORT *pp,int fWait ) #else static int send_cts( pp, fWait ) DTMPORT * pp; int fWait; #endif { Inport * inp = pp->nextToRead; Inport * endp; int iSent = 0; DBGMSG( "send_cts: <[\n" ); /* If we have no ports return OK */ if ( inp == NULL ) inp = pp->in; if ( inp == NULL ) return DTM_OK; /* If we have no ports with RTS pending return OK */ endp = inp; while ( !inp->fCTSsent && (select_one( inp->fd ) < 1 ) ) { inp = inc_in_port( pp, inp ); if ( inp == endp ) { if ( !fWait ) return DTM_OK; else break; } } pp->nextToRead = inp; while ( iSent++ <= DTMSendCTSAhead ) { int32 tmp; /* If we need to send CTS and we are supposed to send it and we are on the first one or if it is ready, send it. */ DBGMSG1( "send_cts: while loop port %X\n", inp ); DBGMSG1( "send_cts: while loop port fd %d\n", inp->fd ); if ( !inp->fCTSsent && ((fWait && (iSent == 1 )) || (dtm_select( inp->fd, &tmp, 0 ) == TRUE && tmp >= 4))) { if ( dtm_send_ack( inp->fd, DTM_CTS ) == DTMERROR) { CHECK_ERR( dtm_destroy_in_port( inp, pp )); /* Never hurts to start at the top. */ inp = pp->nextToRead; /* We just lost our last port */ if ( inp == NULL ) { DBGMSG( "send_cts: done ]>\n" ); DTMerrno = DTMEOF; return DTMERROR; } iSent = 0; continue; } inp->fCTSsent = TRUE; inp->blocklen = DTM_NEW_DATASET; } inp = inc_in_port( pp, inp ); } DBGMSG( "send_cts: ]>\n" ); return DTM_OK; } #ifdef DTM_PROTOTYPES static int accept_one_header(DTMPORT *pp,void *header,int size ) #else static int accept_one_header( pp, header, size ) DTMPORT * pp; void * header; int size; #endif { Inport * inp = pp->nextToRead; int count; int ack ; if ( inp == NULL || !inp->fCTSsent || inp->fGotHeader ) { DTMerrno = DTMCALL; return DTMERROR; } DBGMSG1( "Accepting RTS on %d\n", inp->fd ); if (dtm_recv_ack( inp->fd, &ack ) == DTMERROR ) { dtm_destroy_in_port( inp, pp ); return DTMERROR; } if( ack != DTM_RTS ) { DTMerrno = DTMBADACK; DBGMSG1( "Something other than RTS received %d\n", ack ); dtm_destroy_in_port( inp, pp ); return DTMERROR; } #if 0 /* There are no header ack */ if ( dtm_send_ack( inp->fd, DTM_CTS ) == DTMERROR) { dtm_destroy_in_port( inp, pp ); return DTMERROR; } #endif DBGINT( "Accepting header on %d\n", inp->fd ) ; if( (count = dtm_read_header( inp->fd, header, size )) < 0 ) { DBGINT( "Recv header error = %d\n", errno ); if( DTMerrno != DTMHEADER ) { dtm_destroy_in_port( inp, pp ); } return DTMERROR; } inp->fGotHeader = TRUE; return count; } /* DTMcheckRoute() Check whether new routing information has come in. Returns: */ #ifdef DTM_PROTOTYPES int DTMcheckRoute(int port ) #else int DTMcheckRoute( port ) int port; #endif { CHECK_ERR( port = dtm_map_port_internal( port )); /* if logical port, check for routing message from server */ if (DTMpt[port]->fLogical) return dtm_check_server( DTMpt[port], DTM_DONT_WAIT ); /* if absolute port, return TRUE the first time and FALSE thereafter */ else if (DTMpt[port]->fGotList) return FALSE; else return DTMpt[port]->fGotList = TRUE; } /* Function to test for existence of a new connection or a new message header to be read on a given DTM port. Return values : TRUE if either new connection request or something new to be read is available on given port. DTMERROR on select error. FALSE otherwise. Notes : DTMavailRead is basically call to DTMselectRead for given port with 0 timeout. */ #ifdef DTM_PROTOTYPES int DTMavailRead(int p ) #else int DTMavailRead( p ) int p ; #endif { Dtm_set dtmset ; int dtmnum ; Sock_set sockset ; int socknum ; int fAnyReady; DBGFLOW( "DTMavailRead called\n" ); DTMerrno = DTMNOERR; /* Note: the port here is an external port since that is what selectRead expects */ dtmnum = 1 ; dtmset.port = p ; socknum = 0 ; CHECK_ERR(fAnyReady = DTMselectRead( &dtmset,dtmnum,&sockset,socknum,0)); DBGMSG1( "DTMselectRead returned %d\n", fAnyReady ); CHECK_ERR( p = dtm_map_port_internal( p )); if ( fAnyReady ) { if ( send_cts( DTMpt[p], DTM_DONT_WAIT ) == DTMERROR ) { DBGMSG1( "DTMavailRead send_cts returned error %d\n", DTMerrno ); if ( DTMerrno == DTMEOF ) fAnyReady = FALSE ; else return DTMERROR; } } DBGMSG( "DTMavailRead done\n" ); return fAnyReady; } /* Function to add a socket to a DTM inport. */ #ifdef DTM_PROTOTYPES int DTMaddInPortSocket(int p,int socket ) #else int DTMaddInPortSocket( p, socket ) int p; int socket; #endif { CHECK_ERR( p = dtm_map_port_internal( p )); CHECK_ERR( new_in_port( DTMpt[p], socket )); return DTM_OK; } #ifdef DTM_PROTOTYPES int DTMgetConnectionCount(int port,int *n_connections) #else int DTMgetConnectionCount(port, n_connections) int port; int * n_connections; #endif { int count = 0; reg DTMPORT * pp; CHECK_ERR( port = dtm_map_port_internal( port )); pp = DTMpt[port]; if ( pp->porttype != INPORTTYPE ) { reg Outport * pcur; FOR_EACH_OUT_PORT( pcur, pp ) { count++; } } else { reg Inport * pcur; FOR_EACH_IN_PORT( pcur, pp ) { count++; } } return count; } /* Function to begin reading on a DTM port. The header of the message is read. Return values : >= 0 on success. DTMERROR on some error. */ #ifdef DTM_PROTOTYPES int DTMbeginRead(int p,VOIDPTR header,int size ) #else int DTMbeginRead( p, header, size ) int p ; VOIDPTR header ; int size ; #endif { int count = 0; reg DTMPORT *pp; reg Inport *inp; DBGFLOW( "DTMbeginRead called.\n" ); DTMerrno = DTMNOERR; CHECK_ERR( p = dtm_map_port_internal( p )); pp = DTMpt[p]; while ( TRUE ) { CHECK_ERR( dtm_accept_read_connections( pp, DTM_WAIT)); if ( send_cts( pp, DTM_WAIT ) == DTMERROR) { if ( DTMerrno == DTMEOF ) continue; return DTMERROR; } if ( pp->nextToRead == NULL ) continue; DBGFLOW( "DTMbeginRead before accept_one_header.\n" ); if (( count = accept_one_header( pp, header, size )) == DTMERROR ) { if ( DTMerrno == DTMEOF ) continue; return DTMERROR; } break; } DBGFLOW( "DTMbeginRead done.\n" ); return count; } /* DTMreadDataset() Function to read user messages. Return values : number of bytes read, on success DTMERROR on error */ #ifdef DTM_PROTOTYPES int DTMreadDataset(int p,VOIDPTR ds,int size,DTMTYPE type) #else int DTMreadDataset(p, ds, size, type) int p ; VOIDPTR ds; int size ; DTMTYPE type; #endif { DTMPORT * pp; Inport * inp; DBGFLOW("DTMreadDataset called.\n"); DTMerrno = DTMNOERR; CHECK_ERR( p = dtm_map_port_internal( p )); pp = DTMpt[p]; inp = pp->nextToRead; if ( inp == NULL || !inp->fCTSsent || !inp->fGotHeader ) { DTMerrno = DTMCALL; DTMERR( "DTMreadDataset: DTMbeginRead required before DTMreadDataset."); return DTMERROR; } /* determine max number of bytes that can be read */ size = (*DTMconvertRtns[(int)type])(DTMSTD, NULL, size); /* fill buffer from network */ /* Assume that the caller has checked for EOT */ CHECK_ERR( size = dtm_read_buffer( inp->fd, &inp->blocklen, ds, size)); /* convert dataset to local representation */ return (*DTMconvertRtns[(int)type])(DTMLOCAL, ds, size); } /* DTMendRead() Function to end reading user messages. Return values : 0 on no error DTMERROR on error */ #ifdef DTM_PROTOTYPES int DTMendRead(int p ) #else int DTMendRead( p ) int p ; #endif { DTMPORT * pp; Inport * inp; DBGFLOW("DTMendRead called.\n"); DTMerrno = DTMNOERR; CHECK_ERR( p = dtm_map_port_internal( p )); pp = DTMpt[p]; inp = pp->nextToRead; /* check that DTMbeginRead has been called */ if ( inp == NULL || !inp->fCTSsent || !inp->fGotHeader ) { DTMerrno = DTMCALL; DTMERR( "DTMendRead: DTMbeginRead required before DTMendRead."); return DTMERROR; } /* discard any remaining data */ while ( dtm_read_buffer( inp->fd, &inp->blocklen, dtm_discard, DISCARDSIZE) > 0 ); inp->fCTSsent = FALSE; inp->fGotHeader = FALSE; return DTM_OK ; } /* Function to combine reading of header/data. Return values : number of bytes read, on success DTMERROR on error Notes : This function is really there for completeness sake ( it complements DTMwriteMsg ). It is not very clear how a user can use it effectively ( since he has to know data size beforehand, in which case he need not have a header ). Hence, implementation of this function is to just call beginRead, readDataset and endRead in that order. */ #ifdef DTM_PROTOTYPES int DTMreadMsg(int p,char *hdr,int hdrsize,VOIDPTR data,int datasize, int datatype ) #else int DTMreadMsg( p, hdr, hdrsize, data, datasize, datatype ) int p ; char *hdr ; int hdrsize ; VOIDPTR data ; int datasize ; int datatype ; #endif { int count; DTMerrno = DTMNOERR ; /* Note: all ports given here are external ports */ CHECK_ERR( count = DTMbeginRead( p, hdr, hdrsize )); DBGMSG1( "readMsg got header = %d\n", count ); CHECK_ERR( count = DTMreadDataset( p, data, datasize, datatype )); CHECK_ERR( DTMendRead( p )); return count ; } /* DTMavailWrite() Test whether a subsequent beginWrite( or writeMsg ) will (definitely) succeed or not. Return values : TRUE if subsequent write will succeed. FALSE subsequent write will wait/fail. DTMERROR if port is not initialised or server ( UDP/TCP ) port not yet acquired etc. */ #ifdef DTM_PROTOTYPES int DTMavailWrite(int port ) #else int DTMavailWrite( port ) int port ; #endif { int rstatus = DTM_PORT_NOT_READY; int err_count ; Outport *pcur ; reg DTMPORT *pp; DBGFLOW( "DTMavailWrite called.\n" ); DTMerrno = DTMNOERR; CHECK_ERR( port = dtm_map_port_internal( port )); pp = DTMpt[port]; CHECK_ERR( dtm_check_server( pp, DTM_DONT_WAIT )); /* If we have been told to discard the output, we are ready */ if ( pp->fDiscard ) return TRUE; /* If we have no connections, then we are not ready. */ if ( pp->out == NULL ) return FALSE; /* For all ports in list */ err_count = 0 ; rstatus = DTM_PORT_READY ; FOR_EACH_OUT_PORT( pcur, pp ) { /* Connect to all new active sockets */ if( pcur->connfd == DTM_NO_CONNECTION ) { if( dtm_quick_connect( &pcur->sockaddr, &pcur->connfd ) == DTMERROR ) { ++err_count ; continue ; } } DBGINT( "DTMavailWrite: availWrite = %d\n", pcur -> availwrite ); DBGINT( "DTMavailWrite: seqstart = %d\n", pcur -> seqstart ); if( !(pcur->availwrite) ) { int ack ; int nf ; if ( pp->qservice == DTM_SYNC && !(pcur->seqstart) ) { /* send RTS to new sockets */ if( dtm_send_ack( pcur->connfd, DTM_RTS ) == DTMERROR ) { if( DTMerrno == DTMEOF ) CHECK_ERR( destroy_out_port( pp, &pcur )); ++err_count ; continue ; } pcur->seqstart = TRUE ; } nf = select_one( pcur->connfd ); if( nf < 0 ) { CHECK_ERR( destroy_out_port( pp, &pcur )); DTMerrno = DTMSELECT ; ++err_count ; continue ; } /* No ack yet */ if( nf == 0 ) { if( pp->qservice == DTM_SYNC ) rstatus = DTM_PORT_NOT_READY ; continue ; } /* Receive ack */ if ( dtm_recv_ack( pcur->connfd, &ack ) == DTMERROR) { DBGFLOW( "Incorrect ack for header\n" ); CHECK_ERR( destroy_out_port( pp, &pcur )); ++err_count ; continue ; } /* port is available for write */ if ( pp->qservice == DTM_SYNC ) pcur->availwrite = TRUE; } } /* end while */ /* At some future point we may want to send the status of err_count to the server. */ pp->fLastWasSuccessfulAvailWrite = ( err_count == 0 ) && ( rstatus == DTM_PORT_READY ); return ( err_count != 0 ) ? DTM_PORT_NOT_READY : rstatus ; } /* Function to write user's header. Return values : same as intWriteMsg(). */ #ifdef DTM_PROTOTYPES int DTMbeginWrite(int port,VOIDPTR header,int size ) #else int DTMbeginWrite( port, header, size ) int port ; VOIDPTR header; int size ; #endif { DTMPORT *pp; IOV_BUF iov_buf; CHECK_ERR( port = dtm_map_port_internal( port )); pp = DTMpt[port]; if ( pp->fDiscard ) { CHECK_ERR( dtm_check_server( pp, DTM_DONT_WAIT )); if ( pp->fDiscard ) return DTM_OK; } if ( !pp->fLastWasSuccessfulAvailWrite ) CHECK_ERR( dtm_check_server( pp, DTM_WAIT )); CHECK_ERR( make_out_connections( pp )); make_write_iov( &iov_buf, START_SEQ, NO_END_SEQ, header, size, NULL, 0 ); DBGMSG1( "DTMbeginWrite: before writev_buffer with %d ports\n", outp_count( pp )); CHECK_ERR( writev_buffer( pp, &iov_buf, START_SEQ )); pp->fLastWasSuccessfulAvailWrite = FALSE; CHECK_ERR( check_header_write_ack( pp )); return DTM_OK; } /* Function to write user's data. Return values : same as intWriteMsg(). */ #ifdef DTM_PROTOTYPES int DTMwriteDataset(int p,VOIDPTR ds,int size,DTMTYPE type) #else int DTMwriteDataset(p, ds, size, type) int p; VOIDPTR ds; int size; DTMTYPE type; #endif { DTMPORT * pp; IOV_BUF iov_buf; CHECK_ERR( p = dtm_map_port_internal( p )); pp = DTMpt[p]; if ( pp->fDiscard ) return DTM_OK; CHECK_ERR( verify_out_connections( pp )); size = (*DTMconvertRtns[(int)type]) ( DTMSTD, ds, size ); make_write_iov( &iov_buf, NO_START_SEQ, NO_END_SEQ, NULL, 0, ds, size); CHECK_ERR( writev_buffer( pp, &iov_buf, NO_START_SEQ)); return DTM_OK; } /* Function to end user's write. Return values : same as intWriteMsg(). */ #ifdef DTM_PROTOTYPES int DTMendWrite(int port ) #else int DTMendWrite( port ) int port; #endif { DTMPORT * pp; IOV_BUF iov_buf; reg Outport * pcur ; CHECK_ERR( port = dtm_map_port_internal( port )); pp = DTMpt[port]; /* Check for endWrite before begin */ FOR_EACH_OUT_PORT( pcur, pp ) { if( pcur->connfd == DTM_NO_CONNECTION ) continue; if ((pp->qservice == DTM_SYNC && !pcur->availwrite) || !pcur->seqstart ) { DTMerrno = DTMCALL; return DTMERROR; } } if ( pp->fDiscard ) return DTM_OK; CHECK_ERR( verify_out_connections( pp )); make_write_iov( &iov_buf, NO_START_SEQ, END_SEQ, NULL, 0, NULL, 0 ); CHECK_ERR( writev_buffer( pp, &iov_buf, NO_START_SEQ )); CHECK_ERR( clear_write_flags( pp )); return DTM_OK; } #ifdef DTM_PROTOTYPES int DTMsizeof(DTMTYPE type ) #else int DTMsizeof( type ) DTMTYPE type; #endif { int size = 1; return (*DTMconvertRtns[(int)type])(DTMSTD, NULL, size); } /* Function to do a complete/composite write in which the user header/data is written and the write ended. Return values : same as intWriteMsg(). */ #ifdef DTM_PROTOTYPES int DTMwriteMsg(int p,char *hdr,int hdrsize,VOIDPTR data,int datasize, DTMTYPE datatype ) #else int DTMwriteMsg( p, hdr, hdrsize, data, datasize, datatype ) int p; char *hdr ; int hdrsize ; VOIDPTR data ; int datasize ; DTMTYPE datatype ; #endif { DTMPORT *pp; IOV_BUF iov_buf; CHECK_ERR( p = dtm_map_port_internal( p )); pp = DTMpt[p]; if ( pp->fDiscard ) { CHECK_ERR( dtm_check_server( pp, DTM_DONT_WAIT )); if ( pp->fDiscard ) return DTM_OK; } if ( !pp->fLastWasSuccessfulAvailWrite ) CHECK_ERR( dtm_check_server( pp, DTM_WAIT )); CHECK_ERR( make_out_connections( pp )); CHECK_ERR( verify_out_connections( pp )); datasize = (*DTMconvertRtns[(int)datatype]) ( DTMSTD, data, datasize ); make_write_iov( &iov_buf, START_SEQ, END_SEQ, hdr, hdrsize, data, datasize); CHECK_ERR( writev_buffer( pp, &iov_buf, START_SEQ )); CHECK_ERR( check_header_write_ack( pp )); CHECK_ERR( clear_write_flags( pp )); return DTM_OK; }