1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 #pragma ident   "%Z%%M% %I%     %E% SMI"
  27 
  28 #include <sys/types.h>
  29 #include <sys/stream.h>
  30 #include <sys/strsun.h>
  31 #include <sys/strsubr.h>
  32 #include <sys/debug.h>
  33 #include <sys/sdt.h>
  34 #include <sys/cmn_err.h>
  35 #include <sys/tihdr.h>
  36 
  37 #include <inet/common.h>
  38 #include <inet/optcom.h>
  39 #include <inet/ip.h>
  40 #include <inet/ip_impl.h>
  41 #include <inet/tcp.h>
  42 #include <inet/tcp_impl.h>
  43 #include <inet/ipsec_impl.h>
  44 #include <inet/ipclassifier.h>
  45 #include <inet/ipp_common.h>
  46 
  47 /*
  48  * This file implements TCP fusion - a protocol-less data path for TCP
  49  * loopback connections.  The fusion of two local TCP endpoints occurs
  50  * at connection establishment time.  Various conditions (see details
  51  * in tcp_fuse()) need to be met for fusion to be successful.  If it
  52  * fails, we fall back to the regular TCP data path; if it succeeds,
  53  * both endpoints proceed to use tcp_fuse_output() as the transmit path.
  54  * tcp_fuse_output() enqueues application data directly onto the peer's
  55  * receive queue; no protocol processing is involved.  After enqueueing
  56  * the data, the sender can either push (putnext) data up the receiver's
  57  * read queue; or the sender can simply return and let the receiver
  58  * retrieve the enqueued data via the synchronous streams entry point
  59  * tcp_fuse_rrw().  The latter path is taken if synchronous streams is
  60  * enabled (the default).  It is disabled if sockfs no longer resides
  61  * directly on top of tcp module due to a module insertion or removal.
  62  * It also needs to be temporarily disabled when sending urgent data
  63  * because the tcp_fuse_rrw() path bypasses the M_PROTO processing done
  64  * by strsock_proto() hook.
  65  *
  66  * Sychronization is handled by squeue and the mutex tcp_non_sq_lock.
  67  * One of the requirements for fusion to succeed is that both endpoints
  68  * need to be using the same squeue.  This ensures that neither side
  69  * can disappear while the other side is still sending data.  By itself,
  70  * squeue is not sufficient for guaranteeing safety when synchronous
  71  * streams is enabled.  The reason is that tcp_fuse_rrw() doesn't enter
  72  * the squeue and its access to tcp_rcv_list and other fusion-related
  73  * fields needs to be sychronized with the sender.  tcp_non_sq_lock is
  74  * used for this purpose.  When there is urgent data, the sender needs
  75  * to push the data up the receiver's streams read queue.  In order to
  76  * avoid holding the tcp_non_sq_lock across putnext(), the sender sets
  77  * the peer tcp's tcp_fuse_syncstr_plugged bit and releases tcp_non_sq_lock
  78  * (see macro TCP_FUSE_SYNCSTR_PLUG_DRAIN()).  If tcp_fuse_rrw() enters
  79  * after this point, it will see that synchronous streams is plugged and
  80  * will wait on tcp_fuse_plugcv.  After the sender has finished pushing up
  81  * all urgent data, it will clear the tcp_fuse_syncstr_plugged bit using
  82  * TCP_FUSE_SYNCSTR_UNPLUG_DRAIN().  This will cause any threads waiting
  83  * on tcp_fuse_plugcv to return EBUSY, and in turn cause strget() to call
  84  * getq_noenab() to dequeue data from the stream head instead.  Once the
  85  * data on the stream head has been consumed, tcp_fuse_rrw() may again
  86  * be used to process tcp_rcv_list.  However, if TCP_FUSE_SYNCSTR_STOP()
  87  * has been called, all future calls to tcp_fuse_rrw() will return EBUSY,
  88  * effectively disabling synchronous streams.
  89  *
  90  * The following note applies only to the synchronous streams mode.
  91  *
  92  * Flow control is done by checking the size of receive buffer and
  93  * the number of data blocks, both set to different limits.  This is
  94  * different than regular streams flow control where cumulative size
  95  * check dominates block count check -- streams queue high water mark
  96  * typically represents bytes.  Each enqueue triggers notifications
  97  * to the receiving process; a build up of data blocks indicates a
  98  * slow receiver and the sender should be blocked or informed at the
  99  * earliest moment instead of further wasting system resources.  In
 100  * effect, this is equivalent to limiting the number of outstanding
 101  * segments in flight.
 102  */
 103 
 104 /*
 105  * Setting this to false means we disable fusion altogether and
 106  * loopback connections would go through the protocol paths.
 107  */
 108 boolean_t do_tcp_fusion = B_TRUE;
 109 
 110 /*
 111  * Enabling this flag allows sockfs to retrieve data directly
 112  * from a fused tcp endpoint using synchronous streams interface.
 113  */
 114 boolean_t do_tcp_direct_sockfs = B_TRUE;
 115 
 116 /*
 117  * This is the minimum amount of outstanding writes allowed on
 118  * a synchronous streams-enabled receiving endpoint before the
 119  * sender gets flow-controlled.  Setting this value to 0 means
 120  * that the data block limit is equivalent to the byte count
 121  * limit, which essentially disables the check.
 122  */
 123 #define TCP_FUSION_RCV_UNREAD_MIN       8
 124 uint_t tcp_fusion_rcv_unread_min = TCP_FUSION_RCV_UNREAD_MIN;
 125 
 126 static void             tcp_fuse_syncstr_enable(tcp_t *);
 127 static void             tcp_fuse_syncstr_disable(tcp_t *);
 128 static boolean_t        strrput_sig(queue_t *, boolean_t);
 129 
 130 /*
 131  * Return true if this connection needs some IP functionality
 132  */
 133 static boolean_t
 134 tcp_loopback_needs_ip(tcp_t *tcp, netstack_t *ns)
 135 {
 136         ipsec_stack_t   *ipss = ns->netstack_ipsec;
 137 
 138         /*
 139          * If ire is not cached, do not use fusion
 140          */
 141         if (tcp->tcp_connp->conn_ire_cache == NULL) {
 142                 /*
 143                  * There is no need to hold conn_lock here because when called
 144                  * from tcp_fuse() there can be no window where conn_ire_cache
 145                  * can change. This is not true whe called from
 146                  * tcp_fuse_output(). conn_ire_cache can become null just
 147                  * after the check, but it's ok if a few packets are delivered
 148                  * in the fused state.
 149                  */
 150                 return (B_TRUE);
 151         }
 152         if (tcp->tcp_ipversion == IPV4_VERSION) {
 153                 if (tcp->tcp_ip_hdr_len != IP_SIMPLE_HDR_LENGTH)
 154                         return (B_TRUE);
 155                 if (CONN_OUTBOUND_POLICY_PRESENT(tcp->tcp_connp, ipss))
 156                         return (B_TRUE);
 157                 if (CONN_INBOUND_POLICY_PRESENT(tcp->tcp_connp, ipss))
 158                         return (B_TRUE);
 159         } else {
 160                 if (tcp->tcp_ip_hdr_len != IPV6_HDR_LEN)
 161                         return (B_TRUE);
 162                 if (CONN_OUTBOUND_POLICY_PRESENT_V6(tcp->tcp_connp, ipss))
 163                         return (B_TRUE);
 164                 if (CONN_INBOUND_POLICY_PRESENT_V6(tcp->tcp_connp, ipss))
 165                         return (B_TRUE);
 166         }
 167         if (!CONN_IS_LSO_MD_FASTPATH(tcp->tcp_connp))
 168                 return (B_TRUE);
 169         return (B_FALSE);
 170 }
 171 
 172 
 173 /*
 174  * This routine gets called by the eager tcp upon changing state from
 175  * SYN_RCVD to ESTABLISHED.  It fuses a direct path between itself
 176  * and the active connect tcp such that the regular tcp processings
 177  * may be bypassed under allowable circumstances.  Because the fusion
 178  * requires both endpoints to be in the same squeue, it does not work
 179  * for simultaneous active connects because there is no easy way to
 180  * switch from one squeue to another once the connection is created.
 181  * This is different from the eager tcp case where we assign it the
 182  * same squeue as the one given to the active connect tcp during open.
 183  */
 184 void
 185 tcp_fuse(tcp_t *tcp, uchar_t *iphdr, tcph_t *tcph)
 186 {
 187         conn_t *peer_connp, *connp = tcp->tcp_connp;
 188         tcp_t *peer_tcp;
 189         tcp_stack_t     *tcps = tcp->tcp_tcps;
 190         netstack_t      *ns;
 191         ip_stack_t      *ipst = tcps->tcps_netstack->netstack_ip;
 192 
 193         ASSERT(!tcp->tcp_fused);
 194         ASSERT(tcp->tcp_loopback);
 195         ASSERT(tcp->tcp_loopback_peer == NULL);
 196         /*
 197          * We need to inherit q_hiwat of the listener tcp, but we can't
 198          * really use tcp_listener since we get here after sending up
 199          * T_CONN_IND and tcp_wput_accept() may be called independently,
 200          * at which point tcp_listener is cleared; this is why we use
 201          * tcp_saved_listener.  The listener itself is guaranteed to be
 202          * around until tcp_accept_finish() is called on this eager --
 203          * this won't happen until we're done since we're inside the
 204          * eager's perimeter now.
 205          */
 206         ASSERT(tcp->tcp_saved_listener != NULL);
 207 
 208         /*
 209          * Lookup peer endpoint; search for the remote endpoint having
 210          * the reversed address-port quadruplet in ESTABLISHED state,
 211          * which is guaranteed to be unique in the system.  Zone check
 212          * is applied accordingly for loopback address, but not for
 213          * local address since we want fusion to happen across Zones.
 214          */
 215         if (tcp->tcp_ipversion == IPV4_VERSION) {
 216                 peer_connp = ipcl_conn_tcp_lookup_reversed_ipv4(connp,
 217                     (ipha_t *)iphdr, tcph, ipst);
 218         } else {
 219                 peer_connp = ipcl_conn_tcp_lookup_reversed_ipv6(connp,
 220                     (ip6_t *)iphdr, tcph, ipst);
 221         }
 222 
 223         /*
 224          * We can only proceed if peer exists, resides in the same squeue
 225          * as our conn and is not raw-socket.  The squeue assignment of
 226          * this eager tcp was done earlier at the time of SYN processing
 227          * in ip_fanout_tcp{_v6}.  Note that similar squeues by itself
 228          * doesn't guarantee a safe condition to fuse, hence we perform
 229          * additional tests below.
 230          */
 231         ASSERT(peer_connp == NULL || peer_connp != connp);
 232         if (peer_connp == NULL || peer_connp->conn_sqp != connp->conn_sqp ||
 233             !IPCL_IS_TCP(peer_connp)) {
 234                 if (peer_connp != NULL) {
 235                         TCP_STAT(tcps, tcp_fusion_unqualified);
 236                         CONN_DEC_REF(peer_connp);
 237                 }
 238                 return;
 239         }
 240         peer_tcp = peer_connp->conn_tcp;     /* active connect tcp */
 241 
 242         ASSERT(peer_tcp != NULL && peer_tcp != tcp && !peer_tcp->tcp_fused);
 243         ASSERT(peer_tcp->tcp_loopback && peer_tcp->tcp_loopback_peer == NULL);
 244         ASSERT(peer_connp->conn_sqp == connp->conn_sqp);
 245 
 246         /*
 247          * Fuse the endpoints; we perform further checks against both
 248          * tcp endpoints to ensure that a fusion is allowed to happen.
 249          * In particular we bail out for non-simple TCP/IP or if IPsec/
 250          * IPQoS policy/kernel SSL exists.
 251          */
 252         ns = tcps->tcps_netstack;
 253         ipst = ns->netstack_ip;
 254 
 255         if (!tcp->tcp_unfusable && !peer_tcp->tcp_unfusable &&
 256             !tcp_loopback_needs_ip(tcp, ns) &&
 257             !tcp_loopback_needs_ip(peer_tcp, ns) &&
 258             tcp->tcp_kssl_ent == NULL &&
 259             !IPP_ENABLED(IPP_LOCAL_OUT|IPP_LOCAL_IN, ipst)) {
 260                 mblk_t *mp;
 261                 struct stroptions *stropt;
 262                 queue_t *peer_rq = peer_tcp->tcp_rq;
 263 
 264                 ASSERT(!TCP_IS_DETACHED(peer_tcp) && peer_rq != NULL);
 265                 ASSERT(tcp->tcp_fused_sigurg_mp == NULL);
 266                 ASSERT(peer_tcp->tcp_fused_sigurg_mp == NULL);
 267                 ASSERT(tcp->tcp_kssl_ctx == NULL);
 268 
 269                 /*
 270                  * We need to drain data on both endpoints during unfuse.
 271                  * If we need to send up SIGURG at the time of draining,
 272                  * we want to be sure that an mblk is readily available.
 273                  * This is why we pre-allocate the M_PCSIG mblks for both
 274                  * endpoints which will only be used during/after unfuse.
 275                  */
 276                 if ((mp = allocb(1, BPRI_HI)) == NULL)
 277                         goto failed;
 278 
 279                 tcp->tcp_fused_sigurg_mp = mp;
 280 
 281                 if ((mp = allocb(1, BPRI_HI)) == NULL)
 282                         goto failed;
 283 
 284                 peer_tcp->tcp_fused_sigurg_mp = mp;
 285 
 286                 /* Allocate M_SETOPTS mblk */
 287                 if ((mp = allocb(sizeof (*stropt), BPRI_HI)) == NULL)
 288                         goto failed;
 289 
 290                 /* If either tcp or peer_tcp sodirect enabled then disable */
 291                 if (tcp->tcp_sodirect != NULL) {
 292                         mutex_enter(tcp->tcp_sodirect->sod_lock);
 293                         SOD_DISABLE(tcp->tcp_sodirect);
 294                         mutex_exit(tcp->tcp_sodirect->sod_lock);
 295                         tcp->tcp_sodirect = NULL;
 296                 }
 297                 if (peer_tcp->tcp_sodirect != NULL) {
 298                         mutex_enter(peer_tcp->tcp_sodirect->sod_lock);
 299                         SOD_DISABLE(peer_tcp->tcp_sodirect);
 300                         mutex_exit(peer_tcp->tcp_sodirect->sod_lock);
 301                         peer_tcp->tcp_sodirect = NULL;
 302                 }
 303 
 304                 /* Fuse both endpoints */
 305                 peer_tcp->tcp_loopback_peer = tcp;
 306                 tcp->tcp_loopback_peer = peer_tcp;
 307                 peer_tcp->tcp_fused = tcp->tcp_fused = B_TRUE;
 308 
 309                 /*
 310                  * We never use regular tcp paths in fusion and should
 311                  * therefore clear tcp_unsent on both endpoints.  Having
 312                  * them set to non-zero values means asking for trouble
 313                  * especially after unfuse, where we may end up sending
 314                  * through regular tcp paths which expect xmit_list and
 315                  * friends to be correctly setup.
 316                  */
 317                 peer_tcp->tcp_unsent = tcp->tcp_unsent = 0;
 318 
 319                 tcp_timers_stop(tcp);
 320                 tcp_timers_stop(peer_tcp);
 321 
 322                 /*
 323                  * At this point we are a detached eager tcp and therefore
 324                  * don't have a queue assigned to us until accept happens.
 325                  * In the mean time the peer endpoint may immediately send
 326                  * us data as soon as fusion is finished, and we need to be
 327                  * able to flow control it in case it sends down huge amount
 328                  * of data while we're still detached.  To prevent that we
 329                  * inherit the listener's q_hiwat value; this is temporary
 330                  * since we'll repeat the process in tcp_accept_finish().
 331                  */
 332                 (void) tcp_fuse_set_rcv_hiwat(tcp,
 333                     tcp->tcp_saved_listener->tcp_rq->q_hiwat);
 334 
 335                 /*
 336                  * Set the stream head's write offset value to zero since we
 337                  * won't be needing any room for TCP/IP headers; tell it to
 338                  * not break up the writes (this would reduce the amount of
 339                  * work done by kmem); and configure our receive buffer.
 340                  * Note that we can only do this for the active connect tcp
 341                  * since our eager is still detached; it will be dealt with
 342                  * later in tcp_accept_finish().
 343                  */
 344                 DB_TYPE(mp) = M_SETOPTS;
 345                 mp->b_wptr += sizeof (*stropt);
 346 
 347                 stropt = (struct stroptions *)mp->b_rptr;
 348                 stropt->so_flags = SO_MAXBLK | SO_WROFF | SO_HIWAT;
 349                 stropt->so_maxblk = tcp_maxpsz_set(peer_tcp, B_FALSE);
 350                 stropt->so_wroff = 0;
 351 
 352                 /*
 353                  * Record the stream head's high water mark for
 354                  * peer endpoint; this is used for flow-control
 355                  * purposes in tcp_fuse_output().
 356                  */
 357                 stropt->so_hiwat = tcp_fuse_set_rcv_hiwat(peer_tcp,
 358                     peer_rq->q_hiwat);
 359 
 360                 /* Send the options up */
 361                 putnext(peer_rq, mp);
 362         } else {
 363                 TCP_STAT(tcps, tcp_fusion_unqualified);
 364         }
 365         CONN_DEC_REF(peer_connp);
 366         return;
 367 
 368 failed:
 369         if (tcp->tcp_fused_sigurg_mp != NULL) {
 370                 freeb(tcp->tcp_fused_sigurg_mp);
 371                 tcp->tcp_fused_sigurg_mp = NULL;
 372         }
 373         if (peer_tcp->tcp_fused_sigurg_mp != NULL) {
 374                 freeb(peer_tcp->tcp_fused_sigurg_mp);
 375                 peer_tcp->tcp_fused_sigurg_mp = NULL;
 376         }
 377         CONN_DEC_REF(peer_connp);
 378 }
 379 
 380 /*
 381  * Unfuse a previously-fused pair of tcp loopback endpoints.
 382  */
 383 void
 384 tcp_unfuse(tcp_t *tcp)
 385 {
 386         tcp_t *peer_tcp = tcp->tcp_loopback_peer;
 387 
 388         ASSERT(tcp->tcp_fused && peer_tcp != NULL);
 389         ASSERT(peer_tcp->tcp_fused && peer_tcp->tcp_loopback_peer == tcp);
 390         ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp);
 391         ASSERT(tcp->tcp_unsent == 0 && peer_tcp->tcp_unsent == 0);
 392         ASSERT(tcp->tcp_fused_sigurg_mp != NULL);
 393         ASSERT(peer_tcp->tcp_fused_sigurg_mp != NULL);
 394 
 395         /*
 396          * We disable synchronous streams, drain any queued data and
 397          * clear tcp_direct_sockfs.  The synchronous streams entry
 398          * points will become no-ops after this point.
 399          */
 400         tcp_fuse_disable_pair(tcp, B_TRUE);
 401 
 402         /*
 403          * Update th_seq and th_ack in the header template
 404          */
 405         U32_TO_ABE32(tcp->tcp_snxt, tcp->tcp_tcph->th_seq);
 406         U32_TO_ABE32(tcp->tcp_rnxt, tcp->tcp_tcph->th_ack);
 407         U32_TO_ABE32(peer_tcp->tcp_snxt, peer_tcp->tcp_tcph->th_seq);
 408         U32_TO_ABE32(peer_tcp->tcp_rnxt, peer_tcp->tcp_tcph->th_ack);
 409 
 410         /* Unfuse the endpoints */
 411         peer_tcp->tcp_fused = tcp->tcp_fused = B_FALSE;
 412         peer_tcp->tcp_loopback_peer = tcp->tcp_loopback_peer = NULL;
 413 }
 414 
 415 /*
 416  * Fusion output routine for urgent data.  This routine is called by
 417  * tcp_fuse_output() for handling non-M_DATA mblks.
 418  */
 419 void
 420 tcp_fuse_output_urg(tcp_t *tcp, mblk_t *mp)
 421 {
 422         mblk_t *mp1;
 423         struct T_exdata_ind *tei;
 424         tcp_t *peer_tcp = tcp->tcp_loopback_peer;
 425         mblk_t *head, *prev_head = NULL;
 426         tcp_stack_t     *tcps = tcp->tcp_tcps;
 427 
 428         ASSERT(tcp->tcp_fused);
 429         ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
 430         ASSERT(DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO);
 431         ASSERT(mp->b_cont != NULL && DB_TYPE(mp->b_cont) == M_DATA);
 432         ASSERT(MBLKL(mp) >= sizeof (*tei) && MBLKL(mp->b_cont) > 0);
 433 
 434         /*
 435          * Urgent data arrives in the form of T_EXDATA_REQ from above.
 436          * Each occurence denotes a new urgent pointer.  For each new
 437          * urgent pointer we signal (SIGURG) the receiving app to indicate
 438          * that it needs to go into urgent mode.  This is similar to the
 439          * urgent data handling in the regular tcp.  We don't need to keep
 440          * track of where the urgent pointer is, because each T_EXDATA_REQ
 441          * "advances" the urgent pointer for us.
 442          *
 443          * The actual urgent data carried by T_EXDATA_REQ is then prepended
 444          * by a T_EXDATA_IND before being enqueued behind any existing data
 445          * destined for the receiving app.  There is only a single urgent
 446          * pointer (out-of-band mark) for a given tcp.  If the new urgent
 447          * data arrives before the receiving app reads some existing urgent
 448          * data, the previous marker is lost.  This behavior is emulated
 449          * accordingly below, by removing any existing T_EXDATA_IND messages
 450          * and essentially converting old urgent data into non-urgent.
 451          */
 452         ASSERT(tcp->tcp_valid_bits & TCP_URG_VALID);
 453         /* Let sender get out of urgent mode */
 454         tcp->tcp_valid_bits &= ~TCP_URG_VALID;
 455 
 456         /*
 457          * This flag indicates that a signal needs to be sent up.
 458          * This flag will only get cleared once SIGURG is delivered and
 459          * is not affected by the tcp_fused flag -- delivery will still
 460          * happen even after an endpoint is unfused, to handle the case
 461          * where the sending endpoint immediately closes/unfuses after
 462          * sending urgent data and the accept is not yet finished.
 463          */
 464         peer_tcp->tcp_fused_sigurg = B_TRUE;
 465 
 466         /* Reuse T_EXDATA_REQ mblk for T_EXDATA_IND */
 467         DB_TYPE(mp) = M_PROTO;
 468         tei = (struct T_exdata_ind *)mp->b_rptr;
 469         tei->PRIM_type = T_EXDATA_IND;
 470         tei->MORE_flag = 0;
 471         mp->b_wptr = (uchar_t *)&tei[1];
 472 
 473         TCP_STAT(tcps, tcp_fusion_urg);
 474         BUMP_MIB(&tcps->tcps_mib, tcpOutUrg);
 475 
 476         head = peer_tcp->tcp_rcv_list;
 477         while (head != NULL) {
 478                 /*
 479                  * Remove existing T_EXDATA_IND, keep the data which follows
 480                  * it and relink our list.  Note that we don't modify the
 481                  * tcp_rcv_last_tail since it never points to T_EXDATA_IND.
 482                  */
 483                 if (DB_TYPE(head) != M_DATA) {
 484                         mp1 = head;
 485 
 486                         ASSERT(DB_TYPE(mp1->b_cont) == M_DATA);
 487                         head = mp1->b_cont;
 488                         mp1->b_cont = NULL;
 489                         head->b_next = mp1->b_next;
 490                         mp1->b_next = NULL;
 491                         if (prev_head != NULL)
 492                                 prev_head->b_next = head;
 493                         if (peer_tcp->tcp_rcv_list == mp1)
 494                                 peer_tcp->tcp_rcv_list = head;
 495                         if (peer_tcp->tcp_rcv_last_head == mp1)
 496                                 peer_tcp->tcp_rcv_last_head = head;
 497                         freeb(mp1);
 498                 }
 499                 prev_head = head;
 500                 head = head->b_next;
 501         }
 502 }
 503 
 504 /*
 505  * Fusion output routine, called by tcp_output() and tcp_wput_proto().
 506  * If we are modifying any member that can be changed outside the squeue,
 507  * like tcp_flow_stopped, we need to take tcp_non_sq_lock.
 508  */
 509 boolean_t
 510 tcp_fuse_output(tcp_t *tcp, mblk_t *mp, uint32_t send_size)
 511 {
 512         tcp_t *peer_tcp = tcp->tcp_loopback_peer;
 513         uint_t max_unread;
 514         boolean_t flow_stopped, peer_data_queued = B_FALSE;
 515         boolean_t urgent = (DB_TYPE(mp) != M_DATA);
 516         mblk_t *mp1 = mp;
 517         ill_t *ilp, *olp;
 518         ipha_t *ipha;
 519         ip6_t *ip6h;
 520         tcph_t *tcph;
 521         uint_t ip_hdr_len;
 522         uint32_t seq;
 523         uint32_t recv_size = send_size;
 524         tcp_stack_t     *tcps = tcp->tcp_tcps;
 525         netstack_t      *ns = tcps->tcps_netstack;
 526         ip_stack_t      *ipst = ns->netstack_ip;
 527 
 528         ASSERT(tcp->tcp_fused);
 529         ASSERT(peer_tcp != NULL && peer_tcp->tcp_loopback_peer == tcp);
 530         ASSERT(tcp->tcp_connp->conn_sqp == peer_tcp->tcp_connp->conn_sqp);
 531         ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO ||
 532             DB_TYPE(mp) == M_PCPROTO);
 533 
 534 
 535         /* If this connection requires IP, unfuse and use regular path */
 536         if (tcp_loopback_needs_ip(tcp, ns) ||
 537             tcp_loopback_needs_ip(peer_tcp, ns) ||
 538             IPP_ENABLED(IPP_LOCAL_OUT|IPP_LOCAL_IN, ipst)) {
 539                 TCP_STAT(tcps, tcp_fusion_aborted);
 540                 goto unfuse;
 541         }
 542 
 543         if (send_size == 0) {
 544                 freemsg(mp);
 545                 return (B_TRUE);
 546         }
 547         max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater;
 548 
 549         /*
 550          * Handle urgent data; we either send up SIGURG to the peer now
 551          * or do it later when we drain, in case the peer is detached
 552          * or if we're short of memory for M_PCSIG mblk.
 553          */
 554         if (urgent) {
 555                 /*
 556                  * We stop synchronous streams when we have urgent data
 557                  * queued to prevent tcp_fuse_rrw() from pulling it.  If
 558                  * for some reasons the urgent data can't be delivered
 559                  * below, synchronous streams will remain stopped until
 560                  * someone drains the tcp_rcv_list.
 561                  */
 562                 TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp);
 563                 tcp_fuse_output_urg(tcp, mp);
 564 
 565                 mp1 = mp->b_cont;
 566         }
 567 
 568         if (tcp->tcp_ipversion == IPV4_VERSION &&
 569             (HOOKS4_INTERESTED_LOOPBACK_IN(ipst) ||
 570             HOOKS4_INTERESTED_LOOPBACK_OUT(ipst)) ||
 571             tcp->tcp_ipversion == IPV6_VERSION &&
 572             (HOOKS6_INTERESTED_LOOPBACK_IN(ipst) ||
 573             HOOKS6_INTERESTED_LOOPBACK_OUT(ipst))) {
 574                 /*
 575                  * Build ip and tcp header to satisfy FW_HOOKS.
 576                  * We only build it when any hook is present.
 577                  */
 578                 if ((mp1 = tcp_xmit_mp(tcp, mp1, tcp->tcp_mss, NULL, NULL,
 579                     tcp->tcp_snxt, B_TRUE, NULL, B_FALSE)) == NULL)
 580                         /* If tcp_xmit_mp fails, use regular path */
 581                         goto unfuse;
 582 
 583                 ASSERT(peer_tcp->tcp_connp->conn_ire_cache->ire_ipif != NULL);
 584                 olp = peer_tcp->tcp_connp->conn_ire_cache->ire_ipif->ipif_ill;
 585                 /* PFHooks: LOOPBACK_OUT */
 586                 if (tcp->tcp_ipversion == IPV4_VERSION) {
 587                         ipha = (ipha_t *)mp1->b_rptr;
 588 
 589                         DTRACE_PROBE4(ip4__loopback__out__start,
 590                             ill_t *, NULL, ill_t *, olp,
 591                             ipha_t *, ipha, mblk_t *, mp1);
 592                         FW_HOOKS(ipst->ips_ip4_loopback_out_event,
 593                             ipst->ips_ipv4firewall_loopback_out,
 594                             NULL, olp, ipha, mp1, mp1, 0, ipst);
 595                         DTRACE_PROBE1(ip4__loopback__out__end, mblk_t *, mp1);
 596                 } else {
 597                         ip6h = (ip6_t *)mp1->b_rptr;
 598 
 599                         DTRACE_PROBE4(ip6__loopback__out__start,
 600                             ill_t *, NULL, ill_t *, olp,
 601                             ip6_t *, ip6h, mblk_t *, mp1);
 602                         FW_HOOKS6(ipst->ips_ip6_loopback_out_event,
 603                             ipst->ips_ipv6firewall_loopback_out,
 604                             NULL, olp, ip6h, mp1, mp1, 0, ipst);
 605                         DTRACE_PROBE1(ip6__loopback__out__end, mblk_t *, mp1);
 606                 }
 607                 if (mp1 == NULL)
 608                         goto unfuse;
 609 
 610 
 611                 /* PFHooks: LOOPBACK_IN */
 612                 ASSERT(tcp->tcp_connp->conn_ire_cache->ire_ipif != NULL);
 613                 ilp = tcp->tcp_connp->conn_ire_cache->ire_ipif->ipif_ill;
 614 
 615                 if (tcp->tcp_ipversion == IPV4_VERSION) {
 616                         DTRACE_PROBE4(ip4__loopback__in__start,
 617                             ill_t *, ilp, ill_t *, NULL,
 618                             ipha_t *, ipha, mblk_t *, mp1);
 619                         FW_HOOKS(ipst->ips_ip4_loopback_in_event,
 620                             ipst->ips_ipv4firewall_loopback_in,
 621                             ilp, NULL, ipha, mp1, mp1, 0, ipst);
 622                         DTRACE_PROBE1(ip4__loopback__in__end, mblk_t *, mp1);
 623                         if (mp1 == NULL)
 624                                 goto unfuse;
 625 
 626                         ip_hdr_len = IPH_HDR_LENGTH(ipha);
 627                 } else {
 628                         DTRACE_PROBE4(ip6__loopback__in__start,
 629                             ill_t *, ilp, ill_t *, NULL,
 630                             ip6_t *, ip6h, mblk_t *, mp1);
 631                         FW_HOOKS6(ipst->ips_ip6_loopback_in_event,
 632                             ipst->ips_ipv6firewall_loopback_in,
 633                             ilp, NULL, ip6h, mp1, mp1, 0, ipst);
 634                         DTRACE_PROBE1(ip6__loopback__in__end, mblk_t *, mp1);
 635                         if (mp1 == NULL)
 636                                 goto unfuse;
 637 
 638                         ip_hdr_len = ip_hdr_length_v6(mp1, ip6h);
 639                 }
 640 
 641                 /* Data length might be changed by FW_HOOKS */
 642                 tcph = (tcph_t *)&mp1->b_rptr[ip_hdr_len];
 643                 seq = ABE32_TO_U32(tcph->th_seq);
 644                 recv_size += seq - tcp->tcp_snxt;
 645 
 646                 /*
 647                  * The message duplicated by tcp_xmit_mp is freed.
 648                  * Note: the original message passed in remains unchanged.
 649                  */
 650                 freemsg(mp1);
 651         }
 652 
 653         mutex_enter(&peer_tcp->tcp_non_sq_lock);
 654         /*
 655          * Wake up and signal the peer; it is okay to do this before
 656          * enqueueing because we are holding the lock.  One of the
 657          * advantages of synchronous streams is the ability for us to
 658          * find out when the application performs a read on the socket,
 659          * by way of tcp_fuse_rrw() entry point being called.  Every
 660          * data that gets enqueued onto the receiver is treated as if
 661          * it has arrived at the receiving endpoint, thus generating
 662          * SIGPOLL/SIGIO for asynchronous socket just as in the strrput()
 663          * case.  However, we only wake up the application when necessary,
 664          * i.e. during the first enqueue.  When tcp_fuse_rrw() is called
 665          * it will send everything upstream.
 666          */
 667         if (peer_tcp->tcp_direct_sockfs && !urgent &&
 668             !TCP_IS_DETACHED(peer_tcp)) {
 669                 /* Update poll events and send SIGPOLL/SIGIO if necessary */
 670                 STR_WAKEUP_SENDSIG(STREAM(peer_tcp->tcp_rq),
 671                     peer_tcp->tcp_rcv_list);
 672         }
 673 
 674         /*
 675          * Enqueue data into the peer's receive list; we may or may not
 676          * drain the contents depending on the conditions below.
 677          */
 678         tcp_rcv_enqueue(peer_tcp, mp, recv_size);
 679 
 680         /* In case it wrapped around and also to keep it constant */
 681         peer_tcp->tcp_rwnd += recv_size;
 682         /*
 683          * We increase the peer's unread message count here whilst still
 684          * holding it's tcp_non_sq_lock. This ensures that the increment
 685          * occurs in the same lock acquisition perimeter as the enqueue.
 686          * Depending on lock hierarchy, we can release these locks which
 687          * creates a window in which we can race with tcp_fuse_rrw()
 688          */
 689         peer_tcp->tcp_fuse_rcv_unread_cnt++;
 690 
 691         /*
 692          * Exercise flow-control when needed; we will get back-enabled
 693          * in either tcp_accept_finish(), tcp_unfuse(), or tcp_fuse_rrw().
 694          * If tcp_direct_sockfs is on or if the peer endpoint is detached,
 695          * we emulate streams flow control by checking the peer's queue
 696          * size and high water mark; otherwise we simply use canputnext()
 697          * to decide if we need to stop our flow.
 698          *
 699          * The outstanding unread data block check does not apply for a
 700          * detached receiver; this is to avoid unnecessary blocking of the
 701          * sender while the accept is currently in progress and is quite
 702          * similar to the regular tcp.
 703          */
 704         if (TCP_IS_DETACHED(peer_tcp) || max_unread == 0)
 705                 max_unread = UINT_MAX;
 706 
 707         /*
 708          * Since we are accessing our tcp_flow_stopped and might modify it,
 709          * we need to take tcp->tcp_non_sq_lock. The lock for the highest
 710          * address is held first. Dropping peer_tcp->tcp_non_sq_lock should
 711          * not be an issue here since we are within the squeue and the peer
 712          * won't disappear.
 713          */
 714         if (tcp > peer_tcp) {
 715                 mutex_exit(&peer_tcp->tcp_non_sq_lock);
 716                 mutex_enter(&tcp->tcp_non_sq_lock);
 717                 mutex_enter(&peer_tcp->tcp_non_sq_lock);
 718         } else {
 719                 mutex_enter(&tcp->tcp_non_sq_lock);
 720         }
 721         flow_stopped = tcp->tcp_flow_stopped;
 722         if (((peer_tcp->tcp_direct_sockfs || TCP_IS_DETACHED(peer_tcp)) &&
 723             (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater ||
 724             peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) ||
 725             (!peer_tcp->tcp_direct_sockfs && !TCP_IS_DETACHED(peer_tcp) &&
 726             !canputnext(peer_tcp->tcp_rq))) {
 727                 peer_data_queued = B_TRUE;
 728         }
 729 
 730         if (!flow_stopped && (peer_data_queued ||
 731             (TCP_UNSENT_BYTES(tcp) >= tcp->tcp_xmit_hiwater))) {
 732                 tcp_setqfull(tcp);
 733                 flow_stopped = B_TRUE;
 734                 TCP_STAT(tcps, tcp_fusion_flowctl);
 735                 DTRACE_PROBE4(tcp__fuse__output__flowctl, tcp_t *, tcp,
 736                     uint_t, send_size, uint_t, peer_tcp->tcp_rcv_cnt,
 737                     uint_t, peer_tcp->tcp_fuse_rcv_unread_cnt);
 738         } else if (flow_stopped && !peer_data_queued &&
 739             (TCP_UNSENT_BYTES(tcp) <= tcp->tcp_xmit_lowater)) {
 740                 tcp_clrqfull(tcp);
 741                 TCP_STAT(tcps, tcp_fusion_backenabled);
 742                 flow_stopped = B_FALSE;
 743         }
 744         mutex_exit(&tcp->tcp_non_sq_lock);
 745 
 746         /*
 747          * If we are in synchronous streams mode and the peer read queue is
 748          * not full then schedule a push timer if one is not scheduled
 749          * already. This is needed for applications which use MSG_PEEK to
 750          * determine the number of bytes available before issuing a 'real'
 751          * read. It also makes flow control more deterministic, particularly
 752          * for smaller message sizes.
 753          */
 754         if (!urgent && peer_tcp->tcp_direct_sockfs &&
 755             peer_tcp->tcp_push_tid == 0 && !TCP_IS_DETACHED(peer_tcp) &&
 756             canputnext(peer_tcp->tcp_rq)) {
 757                 peer_tcp->tcp_push_tid = TCP_TIMER(peer_tcp, tcp_push_timer,
 758                     MSEC_TO_TICK(tcps->tcps_push_timer_interval));
 759         }
 760         mutex_exit(&peer_tcp->tcp_non_sq_lock);
 761         ipst->ips_loopback_packets++;
 762         tcp->tcp_last_sent_len = send_size;
 763 
 764         /* Need to adjust the following SNMP MIB-related variables */
 765         tcp->tcp_snxt += send_size;
 766         tcp->tcp_suna = tcp->tcp_snxt;
 767         peer_tcp->tcp_rnxt += recv_size;
 768         peer_tcp->tcp_rack = peer_tcp->tcp_rnxt;
 769 
 770         BUMP_MIB(&tcps->tcps_mib, tcpOutDataSegs);
 771         UPDATE_MIB(&tcps->tcps_mib, tcpOutDataBytes, send_size);
 772 
 773         BUMP_MIB(&tcps->tcps_mib, tcpInSegs);
 774         BUMP_MIB(&tcps->tcps_mib, tcpInDataInorderSegs);
 775         UPDATE_MIB(&tcps->tcps_mib, tcpInDataInorderBytes, send_size);
 776 
 777         BUMP_LOCAL(tcp->tcp_obsegs);
 778         BUMP_LOCAL(peer_tcp->tcp_ibsegs);
 779 
 780         DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size);
 781 
 782         if (!TCP_IS_DETACHED(peer_tcp)) {
 783                 /*
 784                  * Drain the peer's receive queue it has urgent data or if
 785                  * we're not flow-controlled.  There is no need for draining
 786                  * normal data when tcp_direct_sockfs is on because the peer
 787                  * will pull the data via tcp_fuse_rrw().
 788                  */
 789                 if (urgent || (!flow_stopped && !peer_tcp->tcp_direct_sockfs)) {
 790                         ASSERT(peer_tcp->tcp_rcv_list != NULL);
 791                         /*
 792                          * For TLI-based streams, a thread in tcp_accept_swap()
 793                          * can race with us.  That thread will ensure that the
 794                          * correct peer_tcp->tcp_rq is globally visible before
 795                          * peer_tcp->tcp_detached is visible as clear, but we
 796                          * must also ensure that the load of tcp_rq cannot be
 797                          * reordered to be before the tcp_detached check.
 798                          */
 799                         membar_consumer();
 800                         (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp,
 801                             NULL);
 802                         /*
 803                          * If synchronous streams was stopped above due
 804                          * to the presence of urgent data, re-enable it.
 805                          */
 806                         if (urgent)
 807                                 TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp);
 808                 }
 809         }
 810         return (B_TRUE);
 811 unfuse:
 812         tcp_unfuse(tcp);
 813         return (B_FALSE);
 814 }
 815 
 816 /*
 817  * This routine gets called to deliver data upstream on a fused or
 818  * previously fused tcp loopback endpoint; the latter happens only
 819  * when there is a pending SIGURG signal plus urgent data that can't
 820  * be sent upstream in the past.
 821  */
 822 boolean_t
 823 tcp_fuse_rcv_drain(queue_t *q, tcp_t *tcp, mblk_t **sigurg_mpp)
 824 {
 825         mblk_t *mp;
 826 #ifdef DEBUG
 827         uint_t cnt = 0;
 828 #endif
 829         tcp_stack_t     *tcps = tcp->tcp_tcps;
 830         tcp_t           *peer_tcp = tcp->tcp_loopback_peer;
 831         boolean_t       sd_rd_eof = B_FALSE;
 832 
 833         ASSERT(tcp->tcp_loopback);
 834         ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg);
 835         ASSERT(!tcp->tcp_fused || tcp->tcp_loopback_peer != NULL);
 836         ASSERT(sigurg_mpp != NULL || tcp->tcp_fused);
 837 
 838         /* No need for the push timer now, in case it was scheduled */
 839         if (tcp->tcp_push_tid != 0) {
 840                 (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
 841                 tcp->tcp_push_tid = 0;
 842         }
 843         /*
 844          * If there's urgent data sitting in receive list and we didn't
 845          * get a chance to send up a SIGURG signal, make sure we send
 846          * it first before draining in order to ensure that SIOCATMARK
 847          * works properly.
 848          */
 849         if (tcp->tcp_fused_sigurg) {
 850                 /*
 851                  * sigurg_mpp is normally NULL, i.e. when we're still
 852                  * fused and didn't get here because of tcp_unfuse().
 853                  * In this case try hard to allocate the M_PCSIG mblk.
 854                  */
 855                 if (sigurg_mpp == NULL &&
 856                     (mp = allocb(1, BPRI_HI)) == NULL &&
 857                     (mp = allocb_tryhard(1)) == NULL) {
 858                         /* Alloc failed; try again next time */
 859                         tcp->tcp_push_tid = TCP_TIMER(tcp, tcp_push_timer,
 860                             MSEC_TO_TICK(tcps->tcps_push_timer_interval));
 861                         return (B_TRUE);
 862                 } else if (sigurg_mpp != NULL) {
 863                         /*
 864                          * Use the supplied M_PCSIG mblk; it means we're
 865                          * either unfused or in the process of unfusing,
 866                          * and the drain must happen now.
 867                          */
 868                         mp = *sigurg_mpp;
 869                         *sigurg_mpp = NULL;
 870                 }
 871                 ASSERT(mp != NULL);
 872 
 873                 tcp->tcp_fused_sigurg = B_FALSE;
 874                 /* Send up the signal */
 875                 DB_TYPE(mp) = M_PCSIG;
 876                 *mp->b_wptr++ = (uchar_t)SIGURG;
 877                 putnext(q, mp);
 878                 /*
 879                  * Let the regular tcp_rcv_drain() path handle
 880                  * draining the data if we're no longer fused.
 881                  */
 882                 if (!tcp->tcp_fused)
 883                         return (B_FALSE);
 884         }
 885 
 886         /*
 887          * In the synchronous streams case, we generate SIGPOLL/SIGIO for
 888          * each M_DATA that gets enqueued onto the receiver.  At this point
 889          * we are about to drain any queued data via putnext().  In order
 890          * to avoid extraneous signal generation from strrput(), we set
 891          * STRGETINPROG flag at the stream head prior to the draining and
 892          * restore it afterwards.  This masks out signal generation only
 893          * for M_DATA messages and does not affect urgent data. We only do
 894          * this if the STREOF flag is not set which can happen if the
 895          * application shuts down the read side of a stream. In this case
 896          * we simply free these messages to approximate the flushq behavior
 897          * which normally occurs when STREOF is on the stream head read queue.
 898          */
 899         if (tcp->tcp_direct_sockfs)
 900                 sd_rd_eof = strrput_sig(q, B_FALSE);
 901 
 902         /* Drain the data */
 903         while ((mp = tcp->tcp_rcv_list) != NULL) {
 904                 tcp->tcp_rcv_list = mp->b_next;
 905                 mp->b_next = NULL;
 906 #ifdef DEBUG
 907                 cnt += msgdsize(mp);
 908 #endif
 909                 if (sd_rd_eof) {
 910                         freemsg(mp);
 911                 } else {
 912                         putnext(q, mp);
 913                         TCP_STAT(tcps, tcp_fusion_putnext);
 914                 }
 915         }
 916 
 917         if (tcp->tcp_direct_sockfs && !sd_rd_eof)
 918                 (void) strrput_sig(q, B_TRUE);
 919 
 920         ASSERT(cnt == tcp->tcp_rcv_cnt);
 921         tcp->tcp_rcv_last_head = NULL;
 922         tcp->tcp_rcv_last_tail = NULL;
 923         tcp->tcp_rcv_cnt = 0;
 924         tcp->tcp_fuse_rcv_unread_cnt = 0;
 925         tcp->tcp_rwnd = q->q_hiwat;
 926 
 927         if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <=
 928             peer_tcp->tcp_xmit_lowater)) {
 929                 tcp_clrqfull(peer_tcp);
 930                 TCP_STAT(tcps, tcp_fusion_backenabled);
 931         }
 932 
 933         return (B_TRUE);
 934 }
 935 
 936 /*
 937  * Synchronous stream entry point for sockfs to retrieve
 938  * data directly from tcp_rcv_list.
 939  * tcp_fuse_rrw() might end up modifying the peer's tcp_flow_stopped,
 940  * for which it  must take the tcp_non_sq_lock of the peer as well
 941  * making any change. The order of taking the locks is based on
 942  * the TCP pointer itself. Before we get the peer we need to take
 943  * our tcp_non_sq_lock so that the peer doesn't disappear. However,
 944  * we cannot drop the lock if we have to grab the peer's lock (because
 945  * of ordering), since the peer might disappear in the interim. So,
 946  * we take our tcp_non_sq_lock, get the peer, increment the ref on the
 947  * peer's conn, drop all the locks and then take the tcp_non_sq_lock in the
 948  * desired order. Incrementing the conn ref on the peer means that the
 949  * peer won't disappear when we drop our tcp_non_sq_lock.
 950  */
 951 int
 952 tcp_fuse_rrw(queue_t *q, struiod_t *dp)
 953 {
 954         tcp_t *tcp = Q_TO_CONN(q)->conn_tcp;
 955         mblk_t *mp;
 956         tcp_t *peer_tcp;
 957         tcp_stack_t     *tcps = tcp->tcp_tcps;
 958 
 959         mutex_enter(&tcp->tcp_non_sq_lock);
 960 
 961         /*
 962          * If tcp_fuse_syncstr_plugged is set, then another thread is moving
 963          * the underlying data to the stream head.  We need to wait until it's
 964          * done, then return EBUSY so that strget() will dequeue data from the
 965          * stream head to ensure data is drained in-order.
 966          */
 967 plugged:
 968         if (tcp->tcp_fuse_syncstr_plugged) {
 969                 do {
 970                         cv_wait(&tcp->tcp_fuse_plugcv, &tcp->tcp_non_sq_lock);
 971                 } while (tcp->tcp_fuse_syncstr_plugged);
 972 
 973                 mutex_exit(&tcp->tcp_non_sq_lock);
 974                 TCP_STAT(tcps, tcp_fusion_rrw_plugged);
 975                 TCP_STAT(tcps, tcp_fusion_rrw_busy);
 976                 return (EBUSY);
 977         }
 978 
 979         peer_tcp = tcp->tcp_loopback_peer;
 980 
 981         /*
 982          * If someone had turned off tcp_direct_sockfs or if synchronous
 983          * streams is stopped, we return EBUSY.  This causes strget() to
 984          * dequeue data from the stream head instead.
 985          */
 986         if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) {
 987                 mutex_exit(&tcp->tcp_non_sq_lock);
 988                 TCP_STAT(tcps, tcp_fusion_rrw_busy);
 989                 return (EBUSY);
 990         }
 991 
 992         /*
 993          * Grab lock in order. The highest addressed tcp is locked first.
 994          * We don't do this within the tcp_rcv_list check since if we
 995          * have to drop the lock, for ordering, then the tcp_rcv_list
 996          * could change.
 997          */
 998         if (peer_tcp > tcp) {
 999                 CONN_INC_REF(peer_tcp->tcp_connp);
1000                 mutex_exit(&tcp->tcp_non_sq_lock);
1001                 mutex_enter(&peer_tcp->tcp_non_sq_lock);
1002                 mutex_enter(&tcp->tcp_non_sq_lock);
1003                 /*
1004                  * This might have changed in the interim
1005                  * Once read-side tcp_non_sq_lock is dropped above
1006                  * anything can happen, we need to check all
1007                  * known conditions again once we reaquire
1008                  * read-side tcp_non_sq_lock.
1009                  */
1010                 if (tcp->tcp_fuse_syncstr_plugged) {
1011                         mutex_exit(&peer_tcp->tcp_non_sq_lock);
1012                         CONN_DEC_REF(peer_tcp->tcp_connp);
1013                         goto plugged;
1014                 }
1015                 if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped) {
1016                         mutex_exit(&tcp->tcp_non_sq_lock);
1017                         mutex_exit(&peer_tcp->tcp_non_sq_lock);
1018                         CONN_DEC_REF(peer_tcp->tcp_connp);
1019                         TCP_STAT(tcps, tcp_fusion_rrw_busy);
1020                         return (EBUSY);
1021                 }
1022                 CONN_DEC_REF(peer_tcp->tcp_connp);
1023         } else {
1024                 mutex_enter(&peer_tcp->tcp_non_sq_lock);
1025         }
1026 
1027         if ((mp = tcp->tcp_rcv_list) != NULL) {
1028 
1029                 DTRACE_PROBE3(tcp__fuse__rrw, tcp_t *, tcp,
1030                     uint32_t, tcp->tcp_rcv_cnt, ssize_t, dp->d_uio.uio_resid);
1031 
1032                 tcp->tcp_rcv_list = NULL;
1033                 TCP_STAT(tcps, tcp_fusion_rrw_msgcnt);
1034 
1035                 /*
1036                  * At this point nothing should be left in tcp_rcv_list.
1037                  * The only possible case where we would have a chain of
1038                  * b_next-linked messages is urgent data, but we wouldn't
1039                  * be here if that's true since urgent data is delivered
1040                  * via putnext() and synchronous streams is stopped until
1041                  * tcp_fuse_rcv_drain() is finished.
1042                  */
1043                 ASSERT(DB_TYPE(mp) == M_DATA && mp->b_next == NULL);
1044 
1045                 tcp->tcp_rcv_last_head = NULL;
1046                 tcp->tcp_rcv_last_tail = NULL;
1047                 tcp->tcp_rcv_cnt = 0;
1048                 tcp->tcp_fuse_rcv_unread_cnt = 0;
1049 
1050                 if (peer_tcp->tcp_flow_stopped &&
1051                     (TCP_UNSENT_BYTES(peer_tcp) <=
1052                     peer_tcp->tcp_xmit_lowater)) {
1053                         tcp_clrqfull(peer_tcp);
1054                         TCP_STAT(tcps, tcp_fusion_backenabled);
1055                 }
1056         }
1057         mutex_exit(&peer_tcp->tcp_non_sq_lock);
1058         /*
1059          * Either we just dequeued everything or we get here from sockfs
1060          * and have nothing to return; in this case clear RSLEEP.
1061          */
1062         ASSERT(tcp->tcp_rcv_last_head == NULL);
1063         ASSERT(tcp->tcp_rcv_last_tail == NULL);
1064         ASSERT(tcp->tcp_rcv_cnt == 0);
1065         ASSERT(tcp->tcp_fuse_rcv_unread_cnt == 0);
1066         STR_WAKEUP_CLEAR(STREAM(q));
1067 
1068         mutex_exit(&tcp->tcp_non_sq_lock);
1069         dp->d_mp = mp;
1070         return (0);
1071 }
1072 
1073 /*
1074  * Synchronous stream entry point used by certain ioctls to retrieve
1075  * information about or peek into the tcp_rcv_list.
1076  */
1077 int
1078 tcp_fuse_rinfop(queue_t *q, infod_t *dp)
1079 {
1080         tcp_t   *tcp = Q_TO_CONN(q)->conn_tcp;
1081         mblk_t  *mp;
1082         uint_t  cmd = dp->d_cmd;
1083         int     res = 0;
1084         int     error = 0;
1085         struct stdata *stp = STREAM(q);
1086 
1087         mutex_enter(&tcp->tcp_non_sq_lock);
1088         /* If shutdown on read has happened, return nothing */
1089         mutex_enter(&stp->sd_lock);
1090         if (stp->sd_flag & STREOF) {
1091                 mutex_exit(&stp->sd_lock);
1092                 goto done;
1093         }
1094         mutex_exit(&stp->sd_lock);
1095 
1096         /*
1097          * It is OK not to return an answer if tcp_rcv_list is
1098          * currently not accessible.
1099          */
1100         if (!tcp->tcp_direct_sockfs || tcp->tcp_fuse_syncstr_stopped ||
1101             tcp->tcp_fuse_syncstr_plugged || (mp = tcp->tcp_rcv_list) == NULL)
1102                 goto done;
1103 
1104         if (cmd & INFOD_COUNT) {
1105                 /*
1106                  * We have at least one message and
1107                  * could return only one at a time.
1108                  */
1109                 dp->d_count++;
1110                 res |= INFOD_COUNT;
1111         }
1112         if (cmd & INFOD_BYTES) {
1113                 /*
1114                  * Return size of all data messages.
1115                  */
1116                 dp->d_bytes += tcp->tcp_rcv_cnt;
1117                 res |= INFOD_BYTES;
1118         }
1119         if (cmd & INFOD_FIRSTBYTES) {
1120                 /*
1121                  * Return size of first data message.
1122                  */
1123                 dp->d_bytes = msgdsize(mp);
1124                 res |= INFOD_FIRSTBYTES;
1125                 dp->d_cmd &= ~INFOD_FIRSTBYTES;
1126         }
1127         if (cmd & INFOD_COPYOUT) {
1128                 mblk_t *mp1;
1129                 int n;
1130 
1131                 if (DB_TYPE(mp) == M_DATA) {
1132                         mp1 = mp;
1133                 } else {
1134                         mp1 = mp->b_cont;
1135                         ASSERT(mp1 != NULL);
1136                 }
1137 
1138                 /*
1139                  * Return data contents of first message.
1140                  */
1141                 ASSERT(DB_TYPE(mp1) == M_DATA);
1142                 while (mp1 != NULL && dp->d_uiop->uio_resid > 0) {
1143                         n = MIN(dp->d_uiop->uio_resid, MBLKL(mp1));
1144                         if (n != 0 && (error = uiomove((char *)mp1->b_rptr, n,
1145                             UIO_READ, dp->d_uiop)) != 0) {
1146                                 goto done;
1147                         }
1148                         mp1 = mp1->b_cont;
1149                 }
1150                 res |= INFOD_COPYOUT;
1151                 dp->d_cmd &= ~INFOD_COPYOUT;
1152         }
1153 done:
1154         mutex_exit(&tcp->tcp_non_sq_lock);
1155 
1156         dp->d_res |= res;
1157 
1158         return (error);
1159 }
1160 
1161 /*
1162  * Enable synchronous streams on a fused tcp loopback endpoint.
1163  */
1164 static void
1165 tcp_fuse_syncstr_enable(tcp_t *tcp)
1166 {
1167         queue_t *rq = tcp->tcp_rq;
1168         struct stdata *stp = STREAM(rq);
1169 
1170         /* We can only enable synchronous streams for sockfs mode */
1171         tcp->tcp_direct_sockfs = tcp->tcp_issocket && do_tcp_direct_sockfs;
1172 
1173         if (!tcp->tcp_direct_sockfs)
1174                 return;
1175 
1176         mutex_enter(&stp->sd_lock);
1177         mutex_enter(QLOCK(rq));
1178 
1179         /*
1180          * We replace our q_qinfo with one that has the qi_rwp entry point.
1181          * Clear SR_SIGALLDATA because we generate the equivalent signal(s)
1182          * for every enqueued data in tcp_fuse_output().
1183          */
1184         rq->q_qinfo = &tcp_loopback_rinit;
1185         rq->q_struiot = tcp_loopback_rinit.qi_struiot;
1186         stp->sd_struiordq = rq;
1187         stp->sd_rput_opt &= ~SR_SIGALLDATA;
1188 
1189         mutex_exit(QLOCK(rq));
1190         mutex_exit(&stp->sd_lock);
1191 }
1192 
1193 /*
1194  * Disable synchronous streams on a fused tcp loopback endpoint.
1195  */
1196 static void
1197 tcp_fuse_syncstr_disable(tcp_t *tcp)
1198 {
1199         queue_t *rq = tcp->tcp_rq;
1200         struct stdata *stp = STREAM(rq);
1201 
1202         if (!tcp->tcp_direct_sockfs)
1203                 return;
1204 
1205         mutex_enter(&stp->sd_lock);
1206         mutex_enter(QLOCK(rq));
1207 
1208         /*
1209          * Reset q_qinfo to point to the default tcp entry points.
1210          * Also restore SR_SIGALLDATA so that strrput() can generate
1211          * the signals again for future M_DATA messages.
1212          */
1213         rq->q_qinfo = &tcp_rinitv4;      /* No open - same as rinitv6 */
1214         rq->q_struiot = tcp_rinitv4.qi_struiot;
1215         stp->sd_struiordq = NULL;
1216         stp->sd_rput_opt |= SR_SIGALLDATA;
1217         tcp->tcp_direct_sockfs = B_FALSE;
1218 
1219         mutex_exit(QLOCK(rq));
1220         mutex_exit(&stp->sd_lock);
1221 }
1222 
1223 /*
1224  * Enable synchronous streams on a pair of fused tcp endpoints.
1225  */
1226 void
1227 tcp_fuse_syncstr_enable_pair(tcp_t *tcp)
1228 {
1229         tcp_t *peer_tcp = tcp->tcp_loopback_peer;
1230 
1231         ASSERT(tcp->tcp_fused);
1232         ASSERT(peer_tcp != NULL);
1233 
1234         tcp_fuse_syncstr_enable(tcp);
1235         tcp_fuse_syncstr_enable(peer_tcp);
1236 }
1237 
1238 /*
1239  * Used to enable/disable signal generation at the stream head. We already
1240  * generated the signal(s) for these messages when they were enqueued on the
1241  * receiver. We also check if STREOF is set here. If it is, we return false
1242  * and let the caller decide what to do.
1243  */
1244 static boolean_t
1245 strrput_sig(queue_t *q, boolean_t on)
1246 {
1247         struct stdata *stp = STREAM(q);
1248 
1249         mutex_enter(&stp->sd_lock);
1250         if (stp->sd_flag == STREOF) {
1251                 mutex_exit(&stp->sd_lock);
1252                 return (B_TRUE);
1253         }
1254         if (on)
1255                 stp->sd_flag &= ~STRGETINPROG;
1256         else
1257                 stp->sd_flag |= STRGETINPROG;
1258         mutex_exit(&stp->sd_lock);
1259 
1260         return (B_FALSE);
1261 }
1262 
1263 /*
1264  * Disable synchronous streams on a pair of fused tcp endpoints and drain
1265  * any queued data; called either during unfuse or upon transitioning from
1266  * a socket to a stream endpoint due to _SIOCSOCKFALLBACK.
1267  */
1268 void
1269 tcp_fuse_disable_pair(tcp_t *tcp, boolean_t unfusing)
1270 {
1271         tcp_t *peer_tcp = tcp->tcp_loopback_peer;
1272         tcp_stack_t     *tcps = tcp->tcp_tcps;
1273 
1274         ASSERT(tcp->tcp_fused);
1275         ASSERT(peer_tcp != NULL);
1276 
1277         /*
1278          * Force any tcp_fuse_rrw() calls to block until we've moved the data
1279          * onto the stream head.
1280          */
1281         TCP_FUSE_SYNCSTR_PLUG_DRAIN(tcp);
1282         TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp);
1283 
1284         /*
1285          * Cancel any pending push timers.
1286          */
1287         if (tcp->tcp_push_tid != 0) {
1288                 (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
1289                 tcp->tcp_push_tid = 0;
1290         }
1291         if (peer_tcp->tcp_push_tid != 0) {
1292                 (void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid);
1293                 peer_tcp->tcp_push_tid = 0;
1294         }
1295 
1296         /*
1297          * Drain any pending data; the detached check is needed because
1298          * we may be called as a result of a tcp_unfuse() triggered by
1299          * tcp_fuse_output().  Note that in case of a detached tcp, the
1300          * draining will happen later after the tcp is unfused.  For non-
1301          * urgent data, this can be handled by the regular tcp_rcv_drain().
1302          * If we have urgent data sitting in the receive list, we will
1303          * need to send up a SIGURG signal first before draining the data.
1304          * All of these will be handled by the code in tcp_fuse_rcv_drain()
1305          * when called from tcp_rcv_drain().
1306          */
1307         if (!TCP_IS_DETACHED(tcp)) {
1308                 (void) tcp_fuse_rcv_drain(tcp->tcp_rq, tcp,
1309                     (unfusing ? &tcp->tcp_fused_sigurg_mp : NULL));
1310         }
1311         if (!TCP_IS_DETACHED(peer_tcp)) {
1312                 (void) tcp_fuse_rcv_drain(peer_tcp->tcp_rq, peer_tcp,
1313                     (unfusing ? &peer_tcp->tcp_fused_sigurg_mp : NULL));
1314         }
1315 
1316         /*
1317          * Make all current and future tcp_fuse_rrw() calls fail with EBUSY.
1318          * To ensure threads don't sneak past the checks in tcp_fuse_rrw(),
1319          * a given stream must be stopped prior to being unplugged (but the
1320          * ordering of operations between the streams is unimportant).
1321          */
1322         TCP_FUSE_SYNCSTR_STOP(tcp);
1323         TCP_FUSE_SYNCSTR_STOP(peer_tcp);
1324         TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(tcp);
1325         TCP_FUSE_SYNCSTR_UNPLUG_DRAIN(peer_tcp);
1326 
1327         /* Lift up any flow-control conditions */
1328         if (tcp->tcp_flow_stopped) {
1329                 tcp_clrqfull(tcp);
1330                 TCP_STAT(tcps, tcp_fusion_backenabled);
1331         }
1332         if (peer_tcp->tcp_flow_stopped) {
1333                 tcp_clrqfull(peer_tcp);
1334                 TCP_STAT(tcps, tcp_fusion_backenabled);
1335         }
1336 
1337         /* Disable synchronous streams */
1338         tcp_fuse_syncstr_disable(tcp);
1339         tcp_fuse_syncstr_disable(peer_tcp);
1340 }
1341 
1342 /*
1343  * Calculate the size of receive buffer for a fused tcp endpoint.
1344  */
1345 size_t
1346 tcp_fuse_set_rcv_hiwat(tcp_t *tcp, size_t rwnd)
1347 {
1348         tcp_stack_t     *tcps = tcp->tcp_tcps;
1349 
1350         ASSERT(tcp->tcp_fused);
1351 
1352         /* Ensure that value is within the maximum upper bound */
1353         if (rwnd > tcps->tcps_max_buf)
1354                 rwnd = tcps->tcps_max_buf;
1355 
1356         /* Obey the absolute minimum tcp receive high water mark */
1357         if (rwnd < tcps->tcps_sth_rcv_hiwat)
1358                 rwnd = tcps->tcps_sth_rcv_hiwat;
1359 
1360         /*
1361          * Round up to system page size in case SO_RCVBUF is modified
1362          * after SO_SNDBUF; the latter is also similarly rounded up.
1363          */
1364         rwnd = P2ROUNDUP_TYPED(rwnd, PAGESIZE, size_t);
1365         tcp->tcp_fuse_rcv_hiwater = rwnd;
1366         return (rwnd);
1367 }
1368 
1369 /*
1370  * Calculate the maximum outstanding unread data block for a fused tcp endpoint.
1371  */
1372 int
1373 tcp_fuse_maxpsz_set(tcp_t *tcp)
1374 {
1375         tcp_t *peer_tcp = tcp->tcp_loopback_peer;
1376         uint_t sndbuf = tcp->tcp_xmit_hiwater;
1377         uint_t maxpsz = sndbuf;
1378 
1379         ASSERT(tcp->tcp_fused);
1380         ASSERT(peer_tcp != NULL);
1381         ASSERT(peer_tcp->tcp_fuse_rcv_hiwater != 0);
1382         /*
1383          * In the fused loopback case, we want the stream head to split
1384          * up larger writes into smaller chunks for a more accurate flow-
1385          * control accounting.  Our maxpsz is half of the sender's send
1386          * buffer or the receiver's receive buffer, whichever is smaller.
1387          * We round up the buffer to system page size due to the lack of
1388          * TCP MSS concept in Fusion.
1389          */
1390         if (maxpsz > peer_tcp->tcp_fuse_rcv_hiwater)
1391                 maxpsz = peer_tcp->tcp_fuse_rcv_hiwater;
1392         maxpsz = P2ROUNDUP_TYPED(maxpsz, PAGESIZE, uint_t) >> 1;
1393 
1394         /*
1395          * Calculate the peer's limit for the number of outstanding unread
1396          * data block.  This is the amount of data blocks that are allowed
1397          * to reside in the receiver's queue before the sender gets flow
1398          * controlled.  It is used only in the synchronous streams mode as
1399          * a way to throttle the sender when it performs consecutive writes
1400          * faster than can be read.  The value is derived from SO_SNDBUF in
1401          * order to give the sender some control; we divide it with a large
1402          * value (16KB) to produce a fairly low initial limit.
1403          */
1404         if (tcp_fusion_rcv_unread_min == 0) {
1405                 /* A value of 0 means that we disable the check */
1406                 peer_tcp->tcp_fuse_rcv_unread_hiwater = 0;
1407         } else {
1408                 peer_tcp->tcp_fuse_rcv_unread_hiwater =
1409                     MAX(sndbuf >> 14, tcp_fusion_rcv_unread_min);
1410         }
1411         return (maxpsz);
1412 }