Move CallBack Server thread creation, initial processing and destruction to RPC
Cleanup some RPC code.
Remove extraneous fields from nfs41_cb_info and clean up the code.
Change KM_SLEEP in mir_nfs41_callback_thread to KM_NOSLEEP.
Fix lint warnings
Incorporate code review comments.

   1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /* Copyright (c) 1990 Mentat Inc. */
  26 
  27 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  28 /*        All Rights Reserved   */
  29 
  30 /*
  31  * Kernel RPC filtering module
  32  */
  33 
  34 #include <sys/param.h>
  35 #include <sys/types.h>
  36 #include <sys/stream.h>
  37 #include <sys/stropts.h>
  38 #include <sys/tihdr.h>
  39 #include <sys/timod.h>
  40 #include <sys/tiuser.h>
  41 #include <sys/debug.h>
  42 #include <sys/signal.h>
  43 #include <sys/pcb.h>
  44 #include <sys/user.h>
  45 #include <sys/errno.h>
  46 #include <sys/cred.h>
  47 #include <sys/policy.h>
  48 #include <sys/inline.h>
  49 #include <sys/cmn_err.h>
  50 #include <sys/kmem.h>
  51 #include <sys/file.h>
  52 #include <sys/sysmacros.h>
  53 #include <sys/systm.h>
  54 #include <sys/t_lock.h>
  55 #include <sys/ddi.h>
  56 #include <sys/vtrace.h>
  57 #include <sys/callb.h>
  58 #include <sys/strsun.h>
  59 
  60 #include <sys/strlog.h>
  61 #include <rpc/rpc_com.h>
  62 #include <inet/common.h>
  63 #include <rpc/types.h>
  64 #include <sys/time.h>
  65 #include <rpc/xdr.h>
  66 #include <rpc/auth.h>
  67 #include <rpc/clnt.h>
  68 #include <rpc/rpc_msg.h>
  69 #include <rpc/clnt.h>
  70 #include <rpc/svc.h>
  71 #include <rpc/rpcsys.h>
  72 #include <rpc/rpc_rdma.h>
  73 #include <sys/sdt.h>
  74 
  75 /*
  76  * This is the loadable module wrapper.
  77  */
  78 #include <sys/conf.h>
  79 #include <sys/modctl.h>
  80 #include <sys/syscall.h>
  81 
  82 extern struct streamtab rpcinfo;
  83 
  84 static struct fmodsw fsw = {
  85         "rpcmod",
  86         &rpcinfo,
  87         D_NEW|D_MP,
  88 };
  89 
  90 /*
  91  * Module linkage information for the kernel.
  92  */
  93 
  94 static struct modlstrmod modlstrmod = {
  95         &mod_strmodops, "rpc interface str mod", &fsw
  96 };
  97 
  98 /*
  99  * For the RPC system call.
 100  */
 101 static struct sysent rpcsysent = {
 102         2,
 103         SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
 104         rpcsys
 105 };
 106 
 107 static struct modlsys modlsys = {
 108         &mod_syscallops,
 109         "RPC syscall",
 110         &rpcsysent
 111 };
 112 
 113 #ifdef _SYSCALL32_IMPL
 114 static struct modlsys modlsys32 = {
 115         &mod_syscallops32,
 116         "32-bit RPC syscall",
 117         &rpcsysent
 118 };
 119 #endif /* _SYSCALL32_IMPL */
 120 
 121 static struct modlinkage modlinkage = {
 122         MODREV_1,
 123         {
 124                 &modlsys,
 125 #ifdef _SYSCALL32_IMPL
 126                 &modlsys32,
 127 #endif
 128                 &modlstrmod,
 129                 NULL
 130         }
 131 };
 132 
 133 int
 134 _init(void)
 135 {
 136         int error = 0;
 137         callb_id_t cid;
 138         int status;
 139 
 140         svc_init();
 141         clnt_init();
 142         cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
 143 
 144         if (error = mod_install(&modlinkage)) {
 145                 /*
 146                  * Could not install module, cleanup previous
 147                  * initialization work.
 148                  */
 149                 clnt_fini();
 150                 if (cid != NULL)
 151                         (void) callb_delete(cid);
 152 
 153                 return (error);
 154         }
 155 
 156         /*
 157          * Load up the RDMA plugins and initialize the stats. Even if the
 158          * plugins loadup fails, but rpcmod was successfully installed the
 159          * counters still get initialized.
 160          */
 161         rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
 162         mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
 163         mt_kstat_init();
 164 
 165         /*
 166          * Get our identification into ldi.  This is used for loading
 167          * other modules, e.g. rpcib.
 168          */
 169         status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
 170         if (status != 0) {
 171                 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
 172                 rpcmod_li = NULL;
 173         }
 174 
 175         return (error);
 176 }
 177 
 178 /*
 179  * The unload entry point fails, because we advertise entry points into
 180  * rpcmod from the rest of kRPC: rpcmod_release().
 181  */
 182 int
 183 _fini(void)
 184 {
 185         return (EBUSY);
 186 }
 187 
 188 int
 189 _info(struct modinfo *modinfop)
 190 {
 191         return (mod_info(&modlinkage, modinfop));
 192 }
 193 
 194 extern int nulldev();
 195 
 196 #define RPCMOD_ID       2049
 197 
 198 int rmm_open(), rmm_close();
 199 
 200 /*
 201  * To save instructions, since STREAMS ignores the return value
 202  * from these functions, they are defined as void here. Kind of icky, but...
 203  */
 204 void rmm_rput(queue_t *, mblk_t *);
 205 void rmm_wput(queue_t *, mblk_t *);
 206 void rmm_rsrv(queue_t *);
 207 void rmm_wsrv(queue_t *);
 208 
 209 int rpcmodopen(), rpcmodclose();
 210 void rpcmodrput(), rpcmodwput();
 211 void rpcmodrsrv(), rpcmodwsrv();
 212 
 213 static  void    rpcmodwput_other(queue_t *, mblk_t *);
 214 static  int     mir_close(queue_t *q);
 215 static  int     mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
 216                     cred_t *credp);
 217 static  void    mir_rput(queue_t *q, mblk_t *mp);
 218 static  void    mir_rsrv(queue_t *q);
 219 static  void    mir_wput(queue_t *q, mblk_t *mp);
 220 static  void    mir_wsrv(queue_t *q);
 221 
 222 static struct module_info rpcmod_info =
 223         {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
 224 
 225 /*
 226  * Read side has no service procedure.
 227  */
 228 static struct qinit rpcmodrinit = {
 229         (int (*)())rmm_rput,
 230         (int (*)())rmm_rsrv,
 231         rmm_open,
 232         rmm_close,
 233         nulldev,
 234         &rpcmod_info,
 235         NULL
 236 };
 237 
 238 /*
 239  * The write put procedure is simply putnext to conserve stack space.
 240  * The write service procedure is not used to queue data, but instead to
 241  * synchronize with flow control.
 242  */
 243 static struct qinit rpcmodwinit = {
 244         (int (*)())rmm_wput,
 245         (int (*)())rmm_wsrv,
 246         rmm_open,
 247         rmm_close,
 248         nulldev,
 249         &rpcmod_info,
 250         NULL
 251 };
 252 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
 253 
 254 struct xprt_style_ops {
 255         int (*xo_open)();
 256         int (*xo_close)();
 257         void (*xo_wput)();
 258         void (*xo_wsrv)();
 259         void (*xo_rput)();
 260         void (*xo_rsrv)();
 261 };
 262 
 263 static struct xprt_style_ops xprt_clts_ops = {
 264         rpcmodopen,
 265         rpcmodclose,
 266         rpcmodwput,
 267         rpcmodwsrv,
 268         rpcmodrput,
 269         NULL
 270 };
 271 
 272 static struct xprt_style_ops xprt_cots_ops = {
 273         mir_open,
 274         mir_close,
 275         mir_wput,
 276         mir_wsrv,
 277         mir_rput,
 278         mir_rsrv
 279 };
 280 
 281 /*
 282  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
 283  */
 284 struct rpcm {
 285         void            *rm_krpc_cell;  /* Reserved for use by KRPC */
 286         struct          xprt_style_ops  *rm_ops;
 287         int             rm_type;        /* Client or server side stream */
 288 #define RM_CLOSING      0x1             /* somebody is trying to close slot */
 289         uint_t          rm_state;       /* state of the slot. see above */
 290         uint_t          rm_ref;         /* cnt of external references to slot */
 291         kmutex_t        rm_lock;        /* mutex protecting above fields */
 292         kcondvar_t      rm_cwait;       /* condition for closing */
 293         zoneid_t        rm_zoneid;      /* zone which pushed rpcmod */
 294 };
 295 
 296 struct temp_slot {
 297         void *cell;
 298         struct xprt_style_ops *ops;
 299         int type;
 300         mblk_t *info_ack;
 301         kmutex_t lock;
 302         kcondvar_t wait;
 303 };
 304 
 305 typedef struct mir_s {
 306         void    *mir_krpc_cell; /* Reserved for KRPC use. This field */
 307                                         /* must be first in the structure. */
 308         struct xprt_style_ops   *rm_ops;
 309         int     mir_type;               /* Client or server side stream */
 310 
 311         mblk_t  *mir_head_mp;           /* RPC msg in progress */
 312                 /*
 313                  * mir_head_mp points the first mblk being collected in
 314                  * the current RPC message.  Record headers are removed
 315                  * before data is linked into mir_head_mp.
 316                  */
 317         mblk_t  *mir_tail_mp;           /* Last mblk in mir_head_mp */
 318                 /*
 319                  * mir_tail_mp points to the last mblk in the message
 320                  * chain starting at mir_head_mp.  It is only valid
 321                  * if mir_head_mp is non-NULL and is used to add new
 322                  * data blocks to the end of chain quickly.
 323                  */
 324 
 325         int32_t mir_frag_len;           /* Bytes seen in the current frag */
 326                 /*
 327                  * mir_frag_len starts at -4 for beginning of each fragment.
 328                  * When this length is negative, it indicates the number of
 329                  * bytes that rpcmod needs to complete the record marker
 330                  * header.  When it is positive or zero, it holds the number
 331                  * of bytes that have arrived for the current fragment and
 332                  * are held in mir_header_mp.
 333                  */
 334 
 335         int32_t mir_frag_header;
 336                 /*
 337                  * Fragment header as collected for the current fragment.
 338                  * It holds the last-fragment indicator and the number
 339                  * of bytes in the fragment.
 340                  */
 341 
 342         unsigned int
 343                 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */
 344                 mir_hold_inbound : 1,   /* Hold inbound messages on server */
 345                                         /* side until outbound flow control */
 346                                         /* is relieved. */
 347                 mir_closing : 1,        /* The stream is being closed */
 348                 mir_inrservice : 1,     /* data queued or rd srv proc running */
 349                 mir_inwservice : 1,     /* data queued or wr srv proc running */
 350                 mir_inwflushdata : 1,   /* flush M_DATAs when srv runs */
 351                 /*
 352                  * On client streams, mir_clntreq is 0 or 1; it is set
 353                  * to 1 whenever a new request is sent out (mir_wput)
 354                  * and cleared when the timer fires (mir_timer).  If
 355                  * the timer fires with this value equal to 0, then the
 356                  * stream is considered idle and KRPC is notified.
 357                  */
 358                 mir_clntreq : 1,
 359                 /*
 360                  * On server streams, stop accepting messages
 361                  */
 362                 mir_svc_no_more_msgs : 1,
 363                 mir_listen_stream : 1,  /* listen end point */
 364                 mir_unused : 1, /* no longer used */
 365                 mir_timer_call : 1,
 366                 mir_junk_fill_thru_bit_31 : 21;
 367 
 368         int     mir_setup_complete;     /* server has initialized everything */
 369         timeout_id_t mir_timer_id;      /* Timer for idle checks */
 370         clock_t mir_idle_timeout;       /* Allowed idle time before shutdown */
 371                 /*
 372                  * This value is copied from clnt_idle_timeout or
 373                  * svc_idle_timeout during the appropriate ioctl.
 374                  * Kept in milliseconds
 375                  */
 376         clock_t mir_use_timestamp;      /* updated on client with each use */
 377                 /*
 378                  * This value is set to lbolt
 379                  * every time a client stream sends or receives data.
 380                  * Even if the timer message arrives, we don't shutdown
 381                  * client unless:
 382                  *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
 383                  * This value is kept in HZ.
 384                  */
 385 
 386         uint_t  *mir_max_msg_sizep;     /* Reference to sanity check size */
 387                 /*
 388                  * This pointer is set to &clnt_max_msg_size or
 389                  * &svc_max_msg_size during the appropriate ioctl.
 390                  */
 391         zoneid_t mir_zoneid;    /* zone which pushed rpcmod */
 392         /* Server-side fields. */
 393         int     mir_ref_cnt;            /* Reference count: server side only */
 394                                         /* counts the number of references */
 395                                         /* that a kernel RPC server thread */
 396                                         /* (see svc_run()) has on this rpcmod */
 397                                         /* slot. Effectively, it is the */
 398                                         /* number * of unprocessed messages */
 399                                         /* that have been passed up to the */
 400                                         /* KRPC layer */
 401 
 402         mblk_t  *mir_svc_pend_mp;       /* Pending T_ORDREL_IND or */
 403                                         /* T_DISCON_IND */
 404 
 405         /*
 406          * these fields are for both client and server, but for debugging,
 407          * it is easier to have these last in the structure.
 408          */
 409         kmutex_t        mir_mutex;      /* Mutex and condvar for close */
 410         kcondvar_t      mir_condvar;    /* synchronization. */
 411         kcondvar_t      mir_timer_cv;   /* Timer routine sync. */
 412         void            *mir_cb;        /* For callbacks */
 413 } mir_t;
 414 
 415 void tmp_rput(queue_t *q, mblk_t *mp);
 416 
 417 struct xprt_style_ops tmpops = {
 418         NULL,
 419         NULL,
 420         putnext,
 421         NULL,
 422         tmp_rput,
 423         NULL
 424 };
 425 
 426 void
 427 tmp_rput(queue_t *q, mblk_t *mp)
 428 {
 429         struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
 430         struct T_info_ack *pptr;
 431 
 432         switch (mp->b_datap->db_type) {
 433         case M_PCPROTO:
 434                 pptr = (struct T_info_ack *)mp->b_rptr;
 435                 switch (pptr->PRIM_type) {
 436                 case T_INFO_ACK:
 437                         mutex_enter(&t->lock);
 438                         t->info_ack = mp;
 439                         cv_signal(&t->wait);
 440                         mutex_exit(&t->lock);
 441                         return;
 442                 default:
 443                         break;
 444                 }
 445         default:
 446                 break;
 447         }
 448 
 449         /*
 450          * Not an info-ack, so free it. This is ok because we should
 451          * not be receiving data until the open finishes: rpcmod
 452          * is pushed well before the end-point is bound to an address.
 453          */
 454         freemsg(mp);
 455 }
 456 
 457 int
 458 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 459 {
 460         mblk_t *bp;
 461         struct temp_slot ts, *t;
 462         struct T_info_ack *pptr;
 463         int error = 0;
 464 
 465         ASSERT(q != NULL);
 466         /*
 467          * Check for re-opens.
 468          */
 469         if (q->q_ptr) {
 470                 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
 471                     "rpcmodopen_end:(%s)", "q->qptr");
 472                 return (0);
 473         }
 474 
 475         t = &ts;
 476         bzero(t, sizeof (*t));
 477         q->q_ptr = (void *)t;
 478         WR(q)->q_ptr = (void *)t;
 479 
 480         /*
 481          * Allocate the required messages upfront.
 482          */
 483         if ((bp = allocb(sizeof (struct T_info_req) +
 484             sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) {
 485                 return (ENOBUFS);
 486         }
 487 
 488         mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
 489         cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
 490 
 491         t->ops = &tmpops;
 492 
 493         qprocson(q);
 494         bp->b_datap->db_type = M_PCPROTO;
 495         *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
 496         bp->b_wptr += sizeof (struct T_info_req);
 497         putnext(WR(q), bp);
 498 
 499         mutex_enter(&t->lock);
 500         while (t->info_ack == NULL) {
 501                 if (cv_wait_sig(&t->wait, &t->lock) == 0) {
 502                         error = EINTR;
 503                         break;
 504                 }
 505         }
 506         mutex_exit(&t->lock);
 507 
 508         if (error)
 509                 goto out;
 510 
 511         pptr = (struct T_info_ack *)t->info_ack->b_rptr;
 512 
 513         if (pptr->SERV_type == T_CLTS) {
 514                 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
 515                         ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
 516         } else {
 517                 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
 518                         ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
 519         }
 520 
 521 out:
 522         if (error)
 523                 qprocsoff(q);
 524 
 525         freemsg(t->info_ack);
 526         mutex_destroy(&t->lock);
 527         cv_destroy(&t->wait);
 528 
 529         return (error);
 530 }
 531 
 532 void
 533 rmm_rput(queue_t *q, mblk_t  *mp)
 534 {
 535         (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
 536 }
 537 
 538 void
 539 rmm_rsrv(queue_t *q)
 540 {
 541         (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
 542 }
 543 
 544 void
 545 rmm_wput(queue_t *q, mblk_t *mp)
 546 {
 547         (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
 548 }
 549 
 550 void
 551 rmm_wsrv(queue_t *q)
 552 {
 553         (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
 554 }
 555 
 556 int
 557 rmm_close(queue_t *q, int flag, cred_t *crp)
 558 {
 559         return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
 560 }
 561 
 562 /*
 563  * rpcmodopen - open routine gets called when the module gets pushed
 564  *              onto the stream.
 565  */
 566 /*ARGSUSED*/
 567 int
 568 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 569 {
 570         struct rpcm *rmp;
 571 
 572         extern void (*rpc_rele)(queue_t *, mblk_t *);
 573         static void rpcmod_release(queue_t *, mblk_t *);
 574 
 575         TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
 576 
 577         /*
 578          * Initialize entry points to release a rpcmod slot (and an input
 579          * message if supplied) and to send an output message to the module
 580          * below rpcmod.
 581          */
 582         if (rpc_rele == NULL)
 583                 rpc_rele = rpcmod_release;
 584 
 585         /*
 586          * Only sufficiently privileged users can use this module, and it
 587          * is assumed that they will use this module properly, and NOT send
 588          * bulk data from downstream.
 589          */
 590         if (secpolicy_rpcmod_open(crp) != 0)
 591                 return (EPERM);
 592 
 593         /*
 594          * Allocate slot data structure.
 595          */
 596         rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
 597 
 598         mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
 599         cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
 600         rmp->rm_zoneid = rpc_zoneid();
 601         /*
 602          * slot type will be set by kRPC client and server ioctl's
 603          */
 604         rmp->rm_type = 0;
 605 
 606         q->q_ptr = (void *)rmp;
 607         WR(q)->q_ptr = (void *)rmp;
 608 
 609         TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
 610         return (0);
 611 }
 612 
 613 /*
 614  * rpcmodclose - This routine gets called when the module gets popped
 615  * off of the stream.
 616  */
 617 /*ARGSUSED*/
 618 int
 619 rpcmodclose(queue_t *q, int flag, cred_t *crp)
 620 {
 621         struct rpcm *rmp;
 622 
 623         ASSERT(q != NULL);
 624         rmp = (struct rpcm *)q->q_ptr;
 625 
 626         /*
 627          * Mark our state as closing.
 628          */
 629         mutex_enter(&rmp->rm_lock);
 630         rmp->rm_state |= RM_CLOSING;
 631 
 632         /*
 633          * Check and see if there are any messages on the queue.  If so, send
 634          * the messages, regardless whether the downstream module is ready to
 635          * accept data.
 636          */
 637         if (rmp->rm_type == RPC_SERVER) {
 638                 flushq(q, FLUSHDATA);
 639 
 640                 qenable(WR(q));
 641 
 642                 if (rmp->rm_ref) {
 643                         mutex_exit(&rmp->rm_lock);
 644                         /*
 645                          * call into SVC to clean the queue
 646                          */
 647                         svc_queueclean(q);
 648                         mutex_enter(&rmp->rm_lock);
 649 
 650                         /*
 651                          * Block while there are kRPC threads with a reference
 652                          * to this message.
 653                          */
 654                         while (rmp->rm_ref)
 655                                 cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
 656                 }
 657 
 658                 mutex_exit(&rmp->rm_lock);
 659 
 660                 /*
 661                  * It is now safe to remove this queue from the stream. No kRPC
 662                  * threads have a reference to the stream, and none ever will,
 663                  * because RM_CLOSING is set.
 664                  */
 665                 qprocsoff(q);
 666 
 667                 /* Notify kRPC that this stream is going away. */
 668                 svc_queueclose(q);
 669         } else {
 670                 mutex_exit(&rmp->rm_lock);
 671                 qprocsoff(q);
 672         }
 673 
 674         q->q_ptr = NULL;
 675         WR(q)->q_ptr = NULL;
 676         mutex_destroy(&rmp->rm_lock);
 677         cv_destroy(&rmp->rm_cwait);
 678         kmem_free(rmp, sizeof (*rmp));
 679         return (0);
 680 }
 681 
 682 #ifdef  DEBUG
 683 int     rpcmod_send_msg_up = 0;
 684 int     rpcmod_send_uderr = 0;
 685 int     rpcmod_send_dup = 0;
 686 int     rpcmod_send_dup_cnt = 0;
 687 #endif
 688 
 689 /*
 690  * rpcmodrput - Module read put procedure.  This is called from
 691  *              the module, driver, or stream head downstream.
 692  */
 693 void
 694 rpcmodrput(queue_t *q, mblk_t *mp)
 695 {
 696         struct rpcm *rmp;
 697         union T_primitives *pptr;
 698         int hdrsz;
 699 
 700         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
 701 
 702         ASSERT(q != NULL);
 703         rmp = (struct rpcm *)q->q_ptr;
 704 
 705         if (rmp->rm_type == 0) {
 706                 freemsg(mp);
 707                 return;
 708         }
 709 
 710 #ifdef DEBUG
 711         if (rpcmod_send_msg_up > 0) {
 712                 mblk_t *nmp = copymsg(mp);
 713                 if (nmp) {
 714                         putnext(q, nmp);
 715                         rpcmod_send_msg_up--;
 716                 }
 717         }
 718         if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
 719                 mblk_t *nmp;
 720                 struct T_unitdata_ind *data;
 721                 struct T_uderror_ind *ud;
 722                 int d;
 723                 data = (struct T_unitdata_ind *)mp->b_rptr;
 724                 if (data->PRIM_type == T_UNITDATA_IND) {
 725                         d = sizeof (*ud) - sizeof (*data);
 726                         nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
 727                         if (nmp) {
 728                                 ud = (struct T_uderror_ind *)nmp->b_rptr;
 729                                 ud->PRIM_type = T_UDERROR_IND;
 730                                 ud->DEST_length = data->SRC_length;
 731                                 ud->DEST_offset = data->SRC_offset + d;
 732                                 ud->OPT_length = data->OPT_length;
 733                                 ud->OPT_offset = data->OPT_offset + d;
 734                                 ud->ERROR_type = ENETDOWN;
 735                                 if (data->SRC_length) {
 736                                         bcopy(mp->b_rptr +
 737                                             data->SRC_offset,
 738                                             nmp->b_rptr +
 739                                             ud->DEST_offset,
 740                                             data->SRC_length);
 741                                 }
 742                                 if (data->OPT_length) {
 743                                         bcopy(mp->b_rptr +
 744                                             data->OPT_offset,
 745                                             nmp->b_rptr +
 746                                             ud->OPT_offset,
 747                                             data->OPT_length);
 748                                 }
 749                                 nmp->b_wptr += d;
 750                                 nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
 751                                 nmp->b_datap->db_type = M_PROTO;
 752                                 putnext(q, nmp);
 753                                 rpcmod_send_uderr--;
 754                         }
 755                 }
 756         }
 757 #endif
 758         switch (mp->b_datap->db_type) {
 759         default:
 760                 putnext(q, mp);
 761                 break;
 762 
 763         case M_PROTO:
 764         case M_PCPROTO:
 765                 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
 766                 pptr = (union T_primitives *)mp->b_rptr;
 767 
 768                 /*
 769                  * Forward this message to krpc if it is data.
 770                  */
 771                 if (pptr->type == T_UNITDATA_IND) {
 772                         mblk_t *nmp;
 773 
 774                 /*
 775                  * Check if the module is being popped.
 776                  */
 777                         mutex_enter(&rmp->rm_lock);
 778                         if (rmp->rm_state & RM_CLOSING) {
 779                                 mutex_exit(&rmp->rm_lock);
 780                                 putnext(q, mp);
 781                                 break;
 782                         }
 783 
 784                         switch (rmp->rm_type) {
 785                         case RPC_CLIENT:
 786                                 mutex_exit(&rmp->rm_lock);
 787                                 hdrsz = mp->b_wptr - mp->b_rptr;
 788 
 789                                 /*
 790                                  * Make sure the header is sane.
 791                                  */
 792                                 if (hdrsz < TUNITDATAINDSZ ||
 793                                     hdrsz < (pptr->unitdata_ind.OPT_length +
 794                                     pptr->unitdata_ind.OPT_offset) ||
 795                                     hdrsz < (pptr->unitdata_ind.SRC_length +
 796                                     pptr->unitdata_ind.SRC_offset)) {
 797                                         freemsg(mp);
 798                                         return;
 799                                 }
 800 
 801                                 /*
 802                                  * Call clnt_clts_dispatch_notify, so that it
 803                                  * can pass the message to the proper caller.
 804                                  * Don't discard the header just yet since the
 805                                  * client may need the sender's address.
 806                                  */
 807                                 clnt_clts_dispatch_notify(mp, hdrsz,
 808                                     rmp->rm_zoneid);
 809                                 return;
 810                         case RPC_SERVER:
 811                                 /*
 812                                  * rm_krpc_cell is exclusively used by the kRPC
 813                                  * CLTS server
 814                                  */
 815                                 if (rmp->rm_krpc_cell) {
 816 #ifdef DEBUG
 817                                         /*
 818                                          * Test duplicate request cache and
 819                                          * rm_ref count handling by sending a
 820                                          * duplicate every so often, if
 821                                          * desired.
 822                                          */
 823                                         if (rpcmod_send_dup &&
 824                                             rpcmod_send_dup_cnt++ %
 825                                             rpcmod_send_dup)
 826                                                 nmp = copymsg(mp);
 827                                         else
 828                                                 nmp = NULL;
 829 #endif
 830                                         /*
 831                                          * Raise the reference count on this
 832                                          * module to prevent it from being
 833                                          * popped before krpc generates the
 834                                          * reply.
 835                                          */
 836                                         rmp->rm_ref++;
 837                                         mutex_exit(&rmp->rm_lock);
 838 
 839                                         /*
 840                                          * Submit the message to krpc.
 841                                          */
 842                                         svc_queuereq(q, mp);
 843 #ifdef DEBUG
 844                                         /*
 845                                          * Send duplicate if we created one.
 846                                          */
 847                                         if (nmp) {
 848                                                 mutex_enter(&rmp->rm_lock);
 849                                                 rmp->rm_ref++;
 850                                                 mutex_exit(&rmp->rm_lock);
 851                                                 svc_queuereq(q, nmp);
 852                                         }
 853 #endif
 854                                 } else {
 855                                         mutex_exit(&rmp->rm_lock);
 856                                         freemsg(mp);
 857                                 }
 858                                 return;
 859                         default:
 860                                 mutex_exit(&rmp->rm_lock);
 861                                 freemsg(mp);
 862                                 return;
 863                         } /* end switch(rmp->rm_type) */
 864                 } else if (pptr->type == T_UDERROR_IND) {
 865                         mutex_enter(&rmp->rm_lock);
 866                         hdrsz = mp->b_wptr - mp->b_rptr;
 867 
 868                         /*
 869                          * Make sure the header is sane
 870                          */
 871                         if (hdrsz < TUDERRORINDSZ ||
 872                             hdrsz < (pptr->uderror_ind.OPT_length +
 873                             pptr->uderror_ind.OPT_offset) ||
 874                             hdrsz < (pptr->uderror_ind.DEST_length +
 875                             pptr->uderror_ind.DEST_offset)) {
 876                                 mutex_exit(&rmp->rm_lock);
 877                                 freemsg(mp);
 878                                 return;
 879                         }
 880 
 881                         /*
 882                          * In the case where a unit data error has been
 883                          * received, all we need to do is clear the message from
 884                          * the queue.
 885                          */
 886                         mutex_exit(&rmp->rm_lock);
 887                         freemsg(mp);
 888                         RPCLOG(32, "rpcmodrput: unitdata error received at "
 889                             "%ld\n", gethrestime_sec());
 890                         return;
 891                 } /* end else if (pptr->type == T_UDERROR_IND) */
 892 
 893                 putnext(q, mp);
 894                 break;
 895         } /* end switch (mp->b_datap->db_type) */
 896 
 897         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
 898             "rpcmodrput_end:");
 899         /*
 900          * Return codes are not looked at by the STREAMS framework.
 901          */
 902 }
 903 
 904 /*
 905  * write put procedure
 906  */
 907 void
 908 rpcmodwput(queue_t *q, mblk_t *mp)
 909 {
 910         struct rpcm     *rmp;
 911 
 912         ASSERT(q != NULL);
 913 
 914         switch (mp->b_datap->db_type) {
 915                 case M_PROTO:
 916                 case M_PCPROTO:
 917                         break;
 918                 default:
 919                         rpcmodwput_other(q, mp);
 920                         return;
 921         }
 922 
 923         /*
 924          * Check to see if we can send the message downstream.
 925          */
 926         if (canputnext(q)) {
 927                 putnext(q, mp);
 928                 return;
 929         }
 930 
 931         rmp = (struct rpcm *)q->q_ptr;
 932         ASSERT(rmp != NULL);
 933 
 934         /*
 935          * The first canputnext failed.  Try again except this time with the
 936          * lock held, so that we can check the state of the stream to see if
 937          * it is closing.  If either of these conditions evaluate to true
 938          * then send the meesage.
 939          */
 940         mutex_enter(&rmp->rm_lock);
 941         if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
 942                 mutex_exit(&rmp->rm_lock);
 943                 putnext(q, mp);
 944         } else {
 945                 /*
 946                  * canputnext failed again and the stream is not closing.
 947                  * Place the message on the queue and let the service
 948                  * procedure handle the message.
 949                  */
 950                 mutex_exit(&rmp->rm_lock);
 951                 (void) putq(q, mp);
 952         }
 953 }
 954 
 955 static void
 956 rpcmodwput_other(queue_t *q, mblk_t *mp)
 957 {
 958         struct rpcm     *rmp;
 959         struct iocblk   *iocp;
 960 
 961         rmp = (struct rpcm *)q->q_ptr;
 962         ASSERT(rmp != NULL);
 963 
 964         switch (mp->b_datap->db_type) {
 965                 case M_IOCTL:
 966                         iocp = (struct iocblk *)mp->b_rptr;
 967                         ASSERT(iocp != NULL);
 968                         switch (iocp->ioc_cmd) {
 969                                 case RPC_CLIENT:
 970                                 case RPC_SERVER:
 971                                         mutex_enter(&rmp->rm_lock);
 972                                         rmp->rm_type = iocp->ioc_cmd;
 973                                         mutex_exit(&rmp->rm_lock);
 974                                         mp->b_datap->db_type = M_IOCACK;
 975                                         qreply(q, mp);
 976                                         return;
 977                                 default:
 978                                 /*
 979                                  * pass the ioctl downstream and hope someone
 980                                  * down there knows how to handle it.
 981                                  */
 982                                         putnext(q, mp);
 983                                         return;
 984                         }
 985                 default:
 986                         break;
 987         }
 988         /*
 989          * This is something we definitely do not know how to handle, just
 990          * pass the message downstream
 991          */
 992         putnext(q, mp);
 993 }
 994 
 995 /*
 996  * Module write service procedure. This is called by downstream modules
 997  * for back enabling during flow control.
 998  */
 999 void
1000 rpcmodwsrv(queue_t *q)
1001 {
1002         struct rpcm     *rmp;
1003         mblk_t          *mp = NULL;
1004 
1005         rmp = (struct rpcm *)q->q_ptr;
1006         ASSERT(rmp != NULL);
1007 
1008         /*
1009          * Get messages that may be queued and send them down stream
1010          */
1011         while ((mp = getq(q)) != NULL) {
1012                 /*
1013                  * Optimize the service procedure for the server-side, by
1014                  * avoiding a call to canputnext().
1015                  */
1016                 if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
1017                         putnext(q, mp);
1018                         continue;
1019                 }
1020                 (void) putbq(q, mp);
1021                 return;
1022         }
1023 }
1024 
1025 static void
1026 rpcmod_release(queue_t *q, mblk_t *bp)
1027 {
1028         struct rpcm *rmp;
1029 
1030         /*
1031          * For now, just free the message.
1032          */
1033         if (bp)
1034                 freemsg(bp);
1035         rmp = (struct rpcm *)q->q_ptr;
1036 
1037         mutex_enter(&rmp->rm_lock);
1038         rmp->rm_ref--;
1039 
1040         if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
1041                 cv_broadcast(&rmp->rm_cwait);
1042         }
1043 
1044         mutex_exit(&rmp->rm_lock);
1045 }
1046 
1047 /*
1048  * This part of rpcmod is pushed on a connection-oriented transport for use
1049  * by RPC.  It serves to bypass the Stream head, implements
1050  * the record marking protocol, and dispatches incoming RPC messages.
1051  */
1052 
1053 /* Default idle timer values */
1054 #define MIR_CLNT_IDLE_TIMEOUT   (5 * (60 * 1000L))      /* 5 minutes */
1055 #define MIR_SVC_IDLE_TIMEOUT    (6 * (60 * 1000L))      /* 6 minutes */
1056 #define MIR_SVC_ORDREL_TIMEOUT  (10 * (60 * 1000L))     /* 10 minutes */
1057 #define MIR_LASTFRAG    0x80000000      /* Record marker */
1058 
1059 #define DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr))
1060 
1061 #define MIR_SVC_QUIESCED(mir)   \
1062         (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
1063 
1064 #define MIR_CLEAR_INRSRV(mir_ptr)       {       \
1065         (mir_ptr)->mir_inrservice = 0;       \
1066         if ((mir_ptr)->mir_type == RPC_SERVER &&     \
1067                 (mir_ptr)->mir_closing)      \
1068                 cv_signal(&(mir_ptr)->mir_condvar);      \
1069 }
1070 
1071 /*
1072  * Don't block service procedure (and mir_close) if
1073  * we are in the process of closing.
1074  */
1075 #define MIR_WCANPUTNEXT(mir_ptr, write_q)       \
1076         (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
1077 
1078 static int      mir_clnt_dup_request(queue_t *q, mblk_t *mp);
1079 static void     mir_rput_proto(queue_t *q, mblk_t *mp);
1080 static int      mir_svc_policy_notify(queue_t *q, int event);
1081 static void     mir_svc_release(queue_t *wq, mblk_t *mp);
1082 static void     mir_svc_start(queue_t *wq);
1083 static void     mir_svc_idle_start(queue_t *, mir_t *);
1084 static void     mir_svc_idle_stop(queue_t *, mir_t *);
1085 static void     mir_svc_start_close(queue_t *, mir_t *);
1086 static void     mir_clnt_idle_do_stop(queue_t *);
1087 static void     mir_clnt_idle_stop(queue_t *, mir_t *);
1088 static void     mir_clnt_idle_start(queue_t *, mir_t *);
1089 static void     mir_wput(queue_t *q, mblk_t *mp);
1090 static void     mir_wput_other(queue_t *q, mblk_t *mp);
1091 static void     mir_wsrv(queue_t *q);
1092 static  void    mir_disconnect(queue_t *, mir_t *ir);
1093 static  int     mir_check_len(queue_t *, int32_t, mblk_t *);
1094 static  void    mir_timer(void *);
1095 
1096 extern void     (*mir_rele)(queue_t *, mblk_t *);
1097 extern void     (*mir_start)(queue_t *);
1098 extern void     (*clnt_stop_idle)(queue_t *);
1099 
1100 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
1101 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
1102 
1103 /*
1104  * Timeout for subsequent notifications of idle connection.  This is
1105  * typically used to clean up after a wedged orderly release.
1106  */
1107 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
1108 
1109 extern  uint_t  *clnt_max_msg_sizep;
1110 extern  uint_t  *svc_max_msg_sizep;
1111 uint_t  clnt_max_msg_size = RPC_MAXDATASIZE;
1112 uint_t  svc_max_msg_size = RPC_MAXDATASIZE;
1113 uint_t  mir_krpc_cell_null;
1114 
1115 uint32_t        cb_live = 0;
1116 
1117 /*
1118  * XXXsessions
1119  * This global is used to control which path we take in mir_set_cbinfo
1120  * when we detect the race between nfs41_callback_thread() and
1121  * nfs4_sequence_heartbeat_thread().
1122  * If it's non-zero, we simply print a message.
1123  * If it's zero, we bump cb_live and wait for the condition to clear.
1124  * Unfortunately, we tend to hang during data server recovery.
1125  * The sessions team is working on a fix for this.









































1126  */
1127 int     mir_set_cbinfo_hack = 1;

1128 































































1129 void
1130 mir_set_cbinfo(queue_t *wq, void *info)
1131 {


1132         mir_t   *mir = (mir_t *)wq->q_ptr;
1133         struct  __svccb *scb = mir->mir_cb;
1134 
1135         if (scb != NULL) {
1136                 if (mir_set_cbinfo_hack) {
1137                         /*
1138                          * This is a hack to prevent the hang we get
1139                          * w/DS Recovery
1140                          */
1141                         cmn_err(CE_WARN, "mir_set_cbinfo: scb != NULL");
1142                 } else {
1143                         /*
1144                          * XXX
1145                          * Race condition between nfs41_callback_thread()
1146                          * and nfs4_sequence_heartbeat_thread() is causing
1147                          * us to hit ASSERT, since heartbeat thread reaches
1148                          * this code before r_flags is marked as SVCCB_DEAD.
1149                          *
1150                          * Need to revisit this; NFS layer shouldn't be
1151                          * cv_wait()'ing the RPC layer.
1152                          */
1153                         mutex_enter(&scb->r_lock);
1154                         while (!(scb->r_flags & SVCCB_DEAD)) {
1155                                 cb_live++;
1156                                 cv_wait(&scb->r_cbwait, &scb->r_lock);
1157                         }
1158                         mutex_exit(&scb->r_lock);
1159                         kmem_free(scb, sizeof (*scb));
1160                         mir->mir_cb = NULL;
1161                 }
1162         }
1163         mir->mir_cb = (void *)info;











1164 }
1165 
1166 void
1167 mir_clear_cbinfo(queue_t *wq)
1168 {
1169         mir_t   *mir = (mir_t *)wq->q_ptr;
1170         struct __svccb  *scb = mir->mir_cb;
1171 
1172         if (scb == NULL)



1173                 return;
1174 
1175         if (scb->r_flags & SVCCB_DEAD) {
1176                 kmem_free(scb, sizeof (*scb));
1177         }

1178         mir->mir_cb = NULL;












1179 }
1180 
1181 void
1182 mir_check_cb(void *handlecb, queue_t *wq)
1183 {
1184         ASSERT(handlecb == ((mir_t *)wq->q_ptr)->mir_cb);
1185 }
1186 
1187 
1188 SVCCB *
1189 mir_get_svccb(queue_t *wq)
1190 {
1191         mir_t   *mir;
1192         mir = (mir_t *)wq->q_ptr;
1193         return ((SVCCB *)mir->mir_cb);
1194 }
1195 
1196 static void
1197 mir_timer_stop(mir_t *mir)
1198 {
1199         timeout_id_t tid;
1200 
1201         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1202 
1203         /*
1204          * Since the mir_mutex lock needs to be released to call
1205          * untimeout(), we need to make sure that no other thread
1206          * can start/stop the timer (changing mir_timer_id) during
1207          * that time.  The mir_timer_call bit and the mir_timer_cv
1208          * condition variable are used to synchronize this.  Setting
1209          * mir_timer_call also tells mir_timer() (refer to the comments
1210          * in mir_timer()) that it does not need to do anything.
1211          */
1212         while (mir->mir_timer_call)
1213                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1214         mir->mir_timer_call = B_TRUE;
1215 
1216         if ((tid = mir->mir_timer_id) != 0) {
1217                 mir->mir_timer_id = 0;
1218                 mutex_exit(&mir->mir_mutex);
1219                 (void) untimeout(tid);
1220                 mutex_enter(&mir->mir_mutex);
1221         }
1222         mir->mir_timer_call = B_FALSE;
1223         cv_broadcast(&mir->mir_timer_cv);
1224 }
1225 
1226 static void
1227 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
1228 {
1229         timeout_id_t tid;
1230 
1231         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1232 
1233         while (mir->mir_timer_call)
1234                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1235         mir->mir_timer_call = B_TRUE;
1236 
1237         if ((tid = mir->mir_timer_id) != 0) {
1238                 mutex_exit(&mir->mir_mutex);
1239                 (void) untimeout(tid);
1240                 mutex_enter(&mir->mir_mutex);
1241         }
1242         /* Only start the timer when it is not closing. */
1243         if (!mir->mir_closing) {
1244                 mir->mir_timer_id = timeout(mir_timer, q,
1245                     MSEC_TO_TICK(intrvl));
1246         }
1247         mir->mir_timer_call = B_FALSE;
1248         cv_broadcast(&mir->mir_timer_cv);
1249 }
1250 
1251 static int
1252 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
1253 {
1254         mblk_t  *mp1;
1255         uint32_t  new_xid;
1256         uint32_t  old_xid;
1257 
1258         ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
1259         new_xid = BE32_TO_U32(&mp->b_rptr[4]);
1260         /*
1261          * This loop is a bit tacky -- it walks the STREAMS list of
1262          * flow-controlled messages.
1263          */
1264         if ((mp1 = q->q_first) != NULL) {
1265                 do {
1266                         old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
1267                         if (new_xid == old_xid)
1268                                 return (1);
1269                 } while ((mp1 = mp1->b_next) != NULL);
1270         }
1271         return (0);
1272 }
1273 
1274 static int
1275 mir_close(queue_t *q)
1276 {
1277         mir_t   *mir = q->q_ptr;
1278         mblk_t  *mp;
1279         bool_t queue_cleaned = FALSE;
1280 
1281         RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
1282         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1283         mutex_enter(&mir->mir_mutex);
1284         if ((mp = mir->mir_head_mp) != NULL) {
1285                 mir->mir_head_mp = NULL;
1286                 mir->mir_tail_mp = NULL;
1287                 freemsg(mp);
1288         }
1289         /*
1290          * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
1291          * is TRUE.  And mir_timer_start() won't start the timer again.
1292          */
1293         mir->mir_closing = B_TRUE;
1294         mir_timer_stop(mir);
1295 
1296         if (mir->mir_type == RPC_SERVER) {
1297                 flushq(q, FLUSHDATA);   /* Ditch anything waiting on read q */
1298 
1299                 /*
1300                  * This will prevent more requests from arriving and
1301                  * will force rpcmod to ignore flow control.
1302                  */
1303                 mir_svc_start_close(WR(q), mir);
1304 
1305                 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
1306 
1307                         if (mir->mir_ref_cnt && !mir->mir_inrservice &&
1308                             (queue_cleaned == FALSE)) {
1309                                 /*
1310                                  * call into SVC to clean the queue
1311                                  */
1312                                 mutex_exit(&mir->mir_mutex);
1313                                 svc_queueclean(q);
1314                                 queue_cleaned = TRUE;
1315                                 mutex_enter(&mir->mir_mutex);
1316                                 continue;
1317                         }
1318 
1319                         /*
1320                          * Bugid 1253810 - Force the write service
1321                          * procedure to send its messages, regardless
1322                          * whether the downstream  module is ready
1323                          * to accept data.
1324                          */
1325                         if (mir->mir_inwservice == 1)
1326                                 qenable(WR(q));
1327 
1328                         cv_wait(&mir->mir_condvar, &mir->mir_mutex);
1329                 }
1330 
1331                 mutex_exit(&mir->mir_mutex);
1332                 /*
1333                  * Destroy the cm_entry
1334                  */
1335                 connmgr_cb_destroy(WR(q));
1336                 qprocsoff(q);
1337 
1338                 /* Notify KRPC that this stream is going away. */
1339                 svc_queueclose(q);
1340         } else {
1341                 mutex_exit(&mir->mir_mutex);
1342                 qprocsoff(q);
1343         }
1344 
1345         mutex_destroy(&mir->mir_mutex);
1346         cv_destroy(&mir->mir_condvar);
1347         cv_destroy(&mir->mir_timer_cv);
1348         kmem_free(mir, sizeof (mir_t));
1349         return (0);
1350 }
1351 
1352 /*
1353  * This is server side only (RPC_SERVER).
1354  *
1355  * Exit idle mode.
1356  */
1357 static void
1358 mir_svc_idle_stop(queue_t *q, mir_t *mir)
1359 {
1360         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1361         ASSERT((q->q_flag & QREADR) == 0);
1362         ASSERT(mir->mir_type == RPC_SERVER);
1363         RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
1364 
1365         mir_timer_stop(mir);
1366 }
1367 
1368 /*
1369  * This is server side only (RPC_SERVER).
1370  *
1371  * Start idle processing, which will include setting idle timer if the
1372  * stream is not being closed.
1373  */
1374 static void
1375 mir_svc_idle_start(queue_t *q, mir_t *mir)
1376 {
1377         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1378         ASSERT((q->q_flag & QREADR) == 0);
1379         ASSERT(mir->mir_type == RPC_SERVER);
1380         RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
1381 
1382         /*
1383          * Don't re-start idle timer if we are closing queues.
1384          */
1385         if (mir->mir_closing) {
1386                 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
1387                     (void *)q);
1388 
1389                 /*
1390                  * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
1391                  * is true.  When it is true, and we are in the process of
1392                  * closing the stream, signal any thread waiting in
1393                  * mir_close().
1394                  */
1395                 if (mir->mir_inwservice == 0)
1396                         cv_signal(&mir->mir_condvar);
1397 
1398         } else {
1399                 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
1400                     mir->mir_ordrel_pending ? "ordrel" : "normal");
1401                 /*
1402                  * Normal condition, start the idle timer.  If an orderly
1403                  * release has been sent, set the timeout to wait for the
1404                  * client to close its side of the connection.  Otherwise,
1405                  * use the normal idle timeout.
1406                  */
1407                 mir_timer_start(q, mir, mir->mir_ordrel_pending ?
1408                     svc_ordrel_timeout : mir->mir_idle_timeout);
1409         }
1410 }
1411 
1412 /* ARGSUSED */
1413 static int
1414 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
1415 {
1416         mir_t   *mir;
1417 
1418         RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
1419         /* Set variables used directly by KRPC. */
1420         if (!mir_rele)
1421                 mir_rele = mir_svc_release;
1422         if (!mir_start)
1423                 mir_start = mir_svc_start;
1424         if (!clnt_stop_idle)
1425                 clnt_stop_idle = mir_clnt_idle_do_stop;
1426         if (!clnt_max_msg_sizep)
1427                 clnt_max_msg_sizep = &clnt_max_msg_size;
1428         if (!svc_max_msg_sizep)
1429                 svc_max_msg_sizep = &svc_max_msg_size;
1430 
1431         /* Allocate a zero'ed out mir structure for this stream. */
1432         mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
1433 
1434         /*
1435          * We set hold inbound here so that incoming messages will
1436          * be held on the read-side queue until the stream is completely
1437          * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
1438          * the ioctl processing, the flag is cleared and any messages that
1439          * arrived between the open and the ioctl are delivered to KRPC.
1440          *
1441          * Early data should never arrive on a client stream since
1442          * servers only respond to our requests and we do not send any.
1443          * until after the stream is initialized.  Early data is
1444          * very common on a server stream where the client will start
1445          * sending data as soon as the connection is made (and this
1446          * is especially true with TCP where the protocol accepts the
1447          * connection before nfsd or KRPC is notified about it).
1448          */
1449 
1450         mir->mir_hold_inbound = 1;
1451 
1452         /*
1453          * Start the record marker looking for a 4-byte header.  When
1454          * this length is negative, it indicates that rpcmod is looking
1455          * for bytes to consume for the record marker header.  When it
1456          * is positive, it holds the number of bytes that have arrived
1457          * for the current fragment and are being held in mir_header_mp.
1458          */
1459 
1460         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1461 
1462         mir->mir_zoneid = rpc_zoneid();
1463         mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
1464         cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
1465         cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
1466 
1467         q->q_ptr = (char *)mir;
1468         WR(q)->q_ptr = (char *)mir;
1469 
1470         /*
1471          * We noenable the read-side queue because we don't want it
1472          * automatically enabled by putq.  We enable it explicitly
1473          * in mir_wsrv when appropriate. (See additional comments on
1474          * flow control at the beginning of mir_rsrv.)
1475          */
1476         noenable(q);
1477 
1478         qprocson(q);
1479         return (0);
1480 }
1481 void
1482 mir_queue_rele(queue_t *q)
1483 {
1484         mir_t   *mir;
1485 
1486         ASSERT(q != NULL);
1487         mir = (mir_t *)q->q_ptr;
1488         ASSERT(mir != NULL);
1489 
1490         mutex_enter(&mir->mir_mutex);
1491         mir->mir_ref_cnt--;
1492         mutex_exit(&mir->mir_mutex);
1493 }
1494 
1495 void
1496 mir_queue_hold(queue_t *q)
1497 {
1498         mir_t   *mir;
1499 
1500         ASSERT(q != NULL);
1501         mir = (mir_t *)q->q_ptr;
1502         ASSERT(mir != NULL);
1503 
1504         mutex_enter(&mir->mir_mutex);
1505         mir->mir_ref_cnt++;
1506         mutex_exit(&mir->mir_mutex);
1507 }
1508 
1509 /*
1510  * Copy out the RPC transaction id and RPC Direction
1511  * from the mblk chain. Leave the mblk intact.
1512  */
1513 bool_t
1514 mir_dir_xid(mblk_t *mp, uint32_t *dir, uint32_t *xid)
1515 {
1516         unsigned char   *p;
1517         unsigned char   *rptr;
1518         mblk_t          *tmp;
1519         int              i, get_rpcdir;
1520         uint32_t         d_tmp = 0;
1521 
1522         /*
1523          * If we can just grab the XID and RPC direction flag great.
1524          */
1525         if ((IS_P2ALIGNED(mp->b_rptr, (sizeof (uint64_t)))) &&
1526             (mp->b_wptr - mp->b_rptr) >= (sizeof (uint64_t))) {
1527                 *xid = *((uint32_t *)mp->b_rptr);
1528                 *dir = ntohl(*((uint32_t *)(mp->b_rptr + sizeof (uint32_t))));
1529                 return (TRUE);
1530         }
1531 
1532         /*
1533          * Otherwise we need to copy byte-by-byte
1534          */
1535         DTRACE_PROBE(krpc__i__bytecopy);
1536 
1537         i = get_rpcdir = 0;
1538         p = (unsigned char *)xid;
1539         tmp = mp;
1540 
1541         /*
1542          * While we have not exhausted the entire mblk chain:
1543          * copy the first sizeof uint32_t value into xid, and
1544          * then the second sizeof uint32_t value into a temporary
1545          * so that we can convert from network byte order.
1546          *
1547          * Should we exhaust the entire mblk chain in attempting
1548          * to do this, return FALSE.
1549          */
1550         while (tmp) {
1551                 rptr = tmp->b_rptr;
1552                 while (rptr < tmp->b_wptr) {
1553                         *p++ = *rptr++;
1554                         /*
1555                          * Have we collected enough bytes for
1556                          * a uint32_t ?
1557                          */
1558                         if (++i == sizeof (uint32_t)) {
1559                                 /*
1560                                  * If yes, do we need to switch to
1561                                  * RPC Direction or are we all done ?
1562                                  */
1563                                 if (get_rpcdir) {
1564                                         /* Got it all */
1565                                         *dir = ntohl(d_tmp);
1566                                         return (TRUE);
1567                                 }
1568                                 /* start to collect RPC Direction */
1569                                 get_rpcdir++;
1570                                 i = 0;
1571                                 p = (unsigned char *)&d_tmp;
1572                         }
1573                 }
1574                 tmp = tmp->b_cont;
1575         }
1576 
1577         /* We didn't get both of them.. */
1578         DTRACE_PROBE(krpc__e__mblk_exhausted);
1579         return (FALSE);
1580 }
1581 
1582 /*
1583  * Read-side put routine for both the client and server side.  Does the
1584  * record marking for incoming RPC messages, and when complete, dispatches
1585  * the message to either the client or server.
1586  */
1587 static void
1588 mir_rput(queue_t *q, mblk_t *mp)
1589 {
1590         int     excess;
1591         int32_t frag_len, frag_header;
1592         mblk_t  *cont_mp, *head_mp, *tail_mp, *mp1;
1593         mir_t   *mir = q->q_ptr;
1594         boolean_t stop_timer = B_FALSE;
1595         uint32_t        xid;
1596         uint32_t        dir;
1597 
1598         ASSERT(mir != NULL);
1599 
1600         /*
1601          * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
1602          * with the corresponding ioctl, then don't accept
1603          * any inbound data.  This should never happen for streams
1604          * created by nfsd or client-side KRPC because they are careful
1605          * to set the mode of the stream before doing anything else.
1606          */
1607         if (mir->mir_type == 0) {
1608                 freemsg(mp);
1609                 return;
1610         }
1611 
1612         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1613 
1614         switch (mp->b_datap->db_type) {
1615         case M_DATA:
1616                 break;
1617         case M_PROTO:
1618         case M_PCPROTO:
1619                 if (MBLKL(mp) < sizeof (t_scalar_t)) {
1620                         RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
1621                             (int)MBLKL(mp));
1622                         freemsg(mp);
1623                         return;
1624                 }
1625                 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
1626                         mir_rput_proto(q, mp);
1627                         return;
1628                 }
1629 
1630                 /* Throw away the T_DATA_IND block and continue with data. */
1631                 mp1 = mp;
1632                 mp = mp->b_cont;
1633                 freeb(mp1);
1634                 break;
1635         case M_SETOPTS:
1636                 /*
1637                  * If a module on the stream is trying set the Stream head's
1638                  * high water mark, then set our hiwater to the requested
1639                  * value.  We are the "stream head" for all inbound
1640                  * data messages since messages are passed directly to KRPC.
1641                  */
1642                 if (MBLKL(mp) >= sizeof (struct stroptions)) {
1643                         struct stroptions       *stropts;
1644 
1645                         stropts = (struct stroptions *)mp->b_rptr;
1646                         if ((stropts->so_flags & SO_HIWAT) &&
1647                             !(stropts->so_flags & SO_BAND)) {
1648                                 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
1649                         }
1650                 }
1651                 putnext(q, mp);
1652                 return;
1653         case M_FLUSH:
1654                 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
1655                 RPCLOG(32, "on q 0x%p\n", (void *)q);
1656                 putnext(q, mp);
1657                 return;
1658         default:
1659                 putnext(q, mp);
1660                 return;
1661         }
1662 
1663         mutex_enter(&mir->mir_mutex);
1664 
1665         /*
1666          * If this connection is closing, don't accept any new messages.
1667          */
1668         if (mir->mir_svc_no_more_msgs) {
1669                 ASSERT(mir->mir_type == RPC_SERVER);
1670                 mutex_exit(&mir->mir_mutex);
1671                 freemsg(mp);
1672                 return;
1673         }
1674 
1675         /* Get local copies for quicker access. */
1676         frag_len = mir->mir_frag_len;
1677         frag_header = mir->mir_frag_header;
1678         head_mp = mir->mir_head_mp;
1679         tail_mp = mir->mir_tail_mp;
1680 
1681         /* Loop, processing each message block in the mp chain separately. */
1682         do {
1683                 cont_mp = mp->b_cont;
1684                 mp->b_cont = NULL;
1685 
1686                 /*
1687                  * Drop zero-length mblks to prevent unbounded kernel memory
1688                  * consumption.
1689                  */
1690                 if (MBLKL(mp) == 0) {
1691                         freeb(mp);
1692                         continue;
1693                 }
1694 
1695                 /*
1696                  * If frag_len is negative, we're still in the process of
1697                  * building frag_header -- try to complete it with this mblk.
1698                  */
1699                 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
1700                         frag_len++;
1701                         frag_header <<= 8;
1702                         frag_header += *mp->b_rptr++;
1703                 }
1704 
1705                 if (MBLKL(mp) == 0 && frag_len < 0) {
1706                         /*
1707                          * We consumed this mblk while trying to complete the
1708                          * fragment header.  Free it and move on.
1709                          */
1710                         freeb(mp);
1711                         continue;
1712                 }
1713 
1714                 ASSERT(frag_len >= 0);
1715 
1716                 /*
1717                  * Now frag_header has the number of bytes in this fragment
1718                  * and we're just waiting to collect them all.  Chain our
1719                  * latest mblk onto the list and see if we now have enough
1720                  * bytes to complete the fragment.
1721                  */
1722                 if (head_mp == NULL) {
1723                         ASSERT(tail_mp == NULL);
1724                         head_mp = tail_mp = mp;
1725                 } else {
1726                         tail_mp->b_cont = mp;
1727                         tail_mp = mp;
1728                 }
1729 
1730                 frag_len += MBLKL(mp);
1731                 excess = frag_len - (frag_header & ~MIR_LASTFRAG);
1732                 if (excess < 0) {
1733                         /*
1734                          * We still haven't received enough data to complete
1735                          * the fragment, so continue on to the next mblk.
1736                          */
1737                         continue;
1738                 }
1739 
1740                 /*
1741                  * We've got a complete fragment.  If there are excess bytes,
1742                  * then they're part of the next fragment's header (of either
1743                  * this RPC message or the next RPC message).  Split that part
1744                  * into its own mblk so that we can safely freeb() it when
1745                  * building frag_header above.
1746                  */
1747                 if (excess > 0) {
1748                         if ((mp1 = dupb(mp)) == NULL &&
1749                             (mp1 = copyb(mp)) == NULL) {
1750                                 freemsg(head_mp);
1751                                 freemsg(cont_mp);
1752                                 RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
1753                                 mir->mir_frag_header = 0;
1754                                 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1755                                 mir->mir_head_mp = NULL;
1756                                 mir->mir_tail_mp = NULL;
1757                                 mir_disconnect(q, mir); /* drops mir_mutex */
1758                                 return;
1759                         }
1760 
1761                         /*
1762                          * Relink the message chain so that the next mblk is
1763                          * the next fragment header, followed by the rest of
1764                          * the message chain.
1765                          */
1766                         mp1->b_cont = cont_mp;
1767                         cont_mp = mp1;
1768 
1769                         /*
1770                          * Data in the new mblk begins at the next fragment,
1771                          * and data in the old mblk ends at the next fragment.
1772                          */
1773                         mp1->b_rptr = mp1->b_wptr - excess;
1774                         mp->b_wptr -= excess;
1775                 }
1776 
1777                 /*
1778                  * Reset frag_len and frag_header for the next fragment.
1779                  */
1780                 frag_len = -(int32_t)sizeof (uint32_t);
1781                 if (!(frag_header & MIR_LASTFRAG)) {
1782                         /*
1783                          * The current fragment is complete, but more
1784                          * fragments need to be processed before we can
1785                          * pass along the RPC message headed at head_mp.
1786                          */
1787                         frag_header = 0;
1788                         continue;
1789                 }
1790                 frag_header = 0;
1791 
1792                 /*
1793                  * Get msg direction and handle to the appropriate ctxt
1794                  */
1795                 if (!mir_dir_xid(head_mp, &dir, &xid)) {
1796                         /* XXX - if we can't get the dir, we're hosed */
1797                         mutex_exit(&mir->mir_mutex);
1798                         freemsg(head_mp);
1799                         return;
1800                 }
1801 
1802                 /*
1803                  * We've got a complete RPC message; pass it to the
1804                  * appropriate consumer.
1805                  */
1806                 switch (mir->mir_type) {
1807                 case RPC_CLIENT:
1808                         switch (dir) {
1809                         case REPLY:
1810                                 if (clnt_dispatch_notify(head_mp,
1811                                     mir->mir_zoneid, xid)) {
1812                                         /*
1813                                          * Mark this stream as active.  This marker
1814                                          * is used in mir_timer().
1815                                          */
1816                                         mir->mir_clntreq = 1;
1817                                         mir->mir_use_timestamp = lbolt;
1818                                 } else
1819                                         freemsg(head_mp);
1820                                 break;
1821 
1822                         case CALL:

1823                         default:
1824                         {
1825                                 SVCCB   *svccb;
1826                                 ASSERT(dir == CALL);
1827 
1828                                 svccb = (SVCCB *)mir->mir_cb;
1829                                 if (svccb->r_flags & SVCCB_DEAD) {
1830                                         zcmn_err(getzoneid(), CE_NOTE,
1831                                             "Callback On Dead Session"
1832                                             "%p %p", (void *)mir,
1833                                             (void *)svccb);
1834                                 } else {
1835                                         svccb->r_mp = head_mp;
1836                                         cv_signal(&svccb->r_cbwait);
1837                                         break;

1838                                 }



1839                         }


1840 
1841                         }
1842                         break;
1843 
1844                 case RPC_SERVER:
1845                         switch (dir) {
1846                         case REPLY:
1847                                 /*
1848                                  * RPC Server initiated a Callback RPC and
1849                                  * is receiving a reply from the RPC Client.
1850                                  */
1851                                 if (clnt_dispatch_notify(head_mp,
1852                                     global_zone->zone_id, xid)) {
1853                                         /*
1854                                          * Mark this stream as active.
1855                                          * This marker is used in mir_timer().
1856                                          */
1857                                         mir->mir_clntreq = 0;
1858                                 } else
1859                                         freemsg(head_mp);
1860                                 break;
1861 
1862                         case CALL:
1863                         default:
1864                                 /*
1865                                  * Check for flow control before
1866                                  * passing the message to KRPC.
1867                                  */
1868                                 if (!mir->mir_hold_inbound) {
1869                                     if (mir->mir_krpc_cell) {
1870                                         /*
1871                                          * If the reference count is 0
1872                                          * (not including this request),
1873                                          * then the stream is transitioning
1874                                          * from idle to non-idle. In this case,
1875                                          * we cancel the idle timer.
1876                                          */
1877                                         if (mir->mir_ref_cnt++ == 0)
1878                                                 stop_timer = B_TRUE;
1879                                         if (mir_check_len(q,
1880                                             (int32_t)msgdsize(mp), mp))
1881                                                 return;
1882                                         svc_queuereq(q, head_mp); /* to KRPC */
1883                                     } else {
1884                                         /*
1885                                          * Count # of times this happens.
1886                                          * Should be never, but experience
1887                                          * shows otherwise.
1888                                          */
1889                                         mir_krpc_cell_null++;
1890                                         freemsg(head_mp);
1891                                     }
1892 
1893                                 } else {
1894                                         /*
1895                                          * If the outbound side of the stream
1896                                          * is flow controlled, then hold this
1897                                          * message until client catches up.
1898                                          * mir_hold_inbound is set in mir_wput
1899                                          * and cleared in mir_wsrv.
1900                                          */
1901                                         (void) putq(q, head_mp);
1902                                         mir->mir_inrservice = B_TRUE;
1903                                 }
1904                                 break;
1905                         }
1906                         break; /* RPC_SERVER */
1907 
1908                 default:
1909                         RPCLOG(1, "mir_rput: unknown mir_type %d\n",
1910                             mir->mir_type);
1911                         freemsg(head_mp);
1912                         break;
1913                 }
1914 
1915                 /*
1916                  * Reset the chain since we're starting on a new RPC message.
1917                  */
1918                 head_mp = tail_mp = NULL;
1919         } while ((mp = cont_mp) != NULL);
1920 
1921         /*
1922          * Sanity check the message length; if it's too large mir_check_len()
1923          * will shutdown the connection, drop mir_mutex, and return non-zero.
1924          */
1925         if (head_mp != NULL && mir->mir_setup_complete &&
1926             mir_check_len(q, frag_len, head_mp))
1927                 return;
1928 
1929         /* Save our local copies back in the mir structure. */
1930         mir->mir_frag_header = frag_header;
1931         mir->mir_frag_len = frag_len;
1932         mir->mir_head_mp = head_mp;
1933         mir->mir_tail_mp = tail_mp;
1934 
1935         /*
1936          * The timer is stopped after the whole message chain is processed.
1937          * The reason is that stopping the timer releases the mir_mutex
1938          * lock temporarily.  This means that the request can be serviced
1939          * while we are still processing the message chain.  This is not
1940          * good.  So we stop the timer here instead.
1941          *
1942          * Note that if the timer fires before we stop it, it will not
1943          * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
1944          * will just return.
1945          */
1946         if (stop_timer) {
1947                 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
1948                     "ref cnt going to non zero\n", (void *)WR(q));
1949                 mir_svc_idle_stop(WR(q), mir);
1950         }
1951         mutex_exit(&mir->mir_mutex);
1952 }
1953 
1954 static void
1955 mir_rput_proto(queue_t *q, mblk_t *mp)
1956 {
1957         mir_t   *mir = (mir_t *)q->q_ptr;
1958         uint32_t        type;
1959         uint32_t reason = 0;
1960 
1961         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1962 
1963         type = ((union T_primitives *)mp->b_rptr)->type;
1964         switch (mir->mir_type) {
1965         case RPC_CLIENT:
1966                 switch (type) {
1967                 case T_DISCON_IND:
1968                         reason = ((struct T_discon_ind *)
1969                             (mp->b_rptr))->DISCON_reason;
1970                         /*FALLTHROUGH*/
1971                 case T_ORDREL_IND:
1972                         mutex_enter(&mir->mir_mutex);
1973                         if (mir->mir_head_mp) {
1974                                 freemsg(mir->mir_head_mp);
1975                                 mir->mir_head_mp = (mblk_t *)0;
1976                                 mir->mir_tail_mp = (mblk_t *)0;
1977                         }
1978                         /*
1979                          * We are disconnecting, but not necessarily
1980                          * closing. By not closing, we will fail to
1981                          * pick up a possibly changed global timeout value,
1982                          * unless we store it now.
1983                          */
1984                         mir->mir_idle_timeout = clnt_idle_timeout;
1985                         mir_clnt_idle_stop(WR(q), mir);
1986 
1987                         /*
1988                          * Even though we are unconnected, we still
1989                          * leave the idle timer going on the client. The
1990                          * reason for is that if we've disconnected due
1991                          * to a server-side disconnect, reset, or connection
1992                          * timeout, there is a possibility the client may
1993                          * retry the RPC request. This retry needs to done on
1994                          * the same bound address for the server to interpret
1995                          * it as such. However, we don't want
1996                          * to wait forever for that possibility. If the
1997                          * end-point stays unconnected for mir_idle_timeout
1998                          * units of time, then that is a signal to the
1999                          * connection manager to give up waiting for the
2000                          * application (eg. NFS) to send a retry.
2001                          */
2002                         mir_clnt_idle_start(WR(q), mir);
2003                         mutex_exit(&mir->mir_mutex);
2004                         clnt_dispatch_notifyall(WR(q), type, reason);
2005                         freemsg(mp);
2006                         return;
2007                 case T_ERROR_ACK:
2008                 {
2009                         struct T_error_ack      *terror;
2010 
2011                         terror = (struct T_error_ack *)mp->b_rptr;
2012                         RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
2013                             (void *)q);
2014                         RPCLOG(1, " ERROR_prim: %s,",
2015                             rpc_tpiprim2name(terror->ERROR_prim));
2016                         RPCLOG(1, " TLI_error: %s,",
2017                             rpc_tpierr2name(terror->TLI_error));
2018                         RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
2019                         if (terror->ERROR_prim == T_DISCON_REQ)  {
2020                                 clnt_dispatch_notifyall(WR(q), type, reason);
2021                                 freemsg(mp);
2022                                 return;
2023                         } else {
2024                                 if (clnt_dispatch_notifyconn(WR(q), mp))
2025                                         return;
2026                         }
2027                         break;
2028                 }
2029                 case T_OK_ACK:
2030                 {
2031                         struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr;
2032 
2033                         if (tok->CORRECT_prim == T_DISCON_REQ) {
2034                                 clnt_dispatch_notifyall(WR(q), type, reason);
2035                                 freemsg(mp);
2036                                 return;
2037                         } else {
2038                                 if (clnt_dispatch_notifyconn(WR(q), mp))
2039                                         return;
2040                         }
2041                         break;
2042                 }
2043                 case T_CONN_CON:
2044                 case T_INFO_ACK:
2045                 case T_OPTMGMT_ACK:
2046                         if (clnt_dispatch_notifyconn(WR(q), mp))
2047                                 return;
2048                         break;
2049                 case T_BIND_ACK:
2050                         break;
2051                 default:
2052                         RPCLOG(1, "mir_rput: unexpected message %d "
2053                             "for KRPC client\n",
2054                             ((union T_primitives *)mp->b_rptr)->type);
2055                         break;
2056                 }
2057                 break;
2058 
2059         case RPC_SERVER:
2060                 switch (type) {
2061                 case T_BIND_ACK:
2062                 {
2063                         struct T_bind_ack       *tbind;
2064 
2065                         /*
2066                          * If this is a listening stream, then shut
2067                          * off the idle timer.
2068                          */
2069                         tbind = (struct T_bind_ack *)mp->b_rptr;
2070                         if (tbind->CONIND_number > 0) {
2071                                 mutex_enter(&mir->mir_mutex);
2072                                 mir_svc_idle_stop(WR(q), mir);
2073 
2074                                 /*
2075                                  * mark this as a listen endpoint
2076                                  * for special handling.
2077                                  */
2078 
2079                                 mir->mir_listen_stream = 1;
2080                                 mutex_exit(&mir->mir_mutex);
2081                         }
2082                         break;
2083                 }
2084                 case T_DISCON_IND:
2085                 case T_ORDREL_IND:
2086                         RPCLOG(16, "mir_rput_proto: got %s indication\n",
2087                             type == T_DISCON_IND ? "disconnect"
2088                             : "orderly release");
2089 
2090                         /*
2091                          * For listen endpoint just pass
2092                          * on the message.
2093                          */
2094 
2095                         if (mir->mir_listen_stream)
2096                                 break;
2097 
2098 
2099                         mutex_enter(&mir->mir_mutex);
2100 
2101                         /*
2102                          * If client wants to break off connection, record
2103                          * that fact.
2104                          */
2105                         mir_svc_start_close(WR(q), mir);
2106 
2107                         /*
2108                          * If we are idle, then send the orderly release
2109                          * or disconnect indication to nfsd.
2110                          */
2111                         if (MIR_SVC_QUIESCED(mir)) {
2112                                 mutex_exit(&mir->mir_mutex);
2113                                 break;
2114                         }
2115 
2116                         RPCLOG(16, "mir_rput_proto: not idle, so "
2117                             "disconnect/ord rel indication not passed "
2118                             "upstream on 0x%p\n", (void *)q);
2119 
2120                         /*
2121                          * Hold the indication until we get idle
2122                          * If there already is an indication stored,
2123                          * replace it if the new one is a disconnect. The
2124                          * reasoning is that disconnection takes less time
2125                          * to process, and once a client decides to
2126                          * disconnect, we should do that.
2127                          */
2128                         if (mir->mir_svc_pend_mp) {
2129                                 if (type == T_DISCON_IND) {
2130                                         RPCLOG(16, "mir_rput_proto: replacing"
2131                                             " held disconnect/ord rel"
2132                                             " indication with disconnect on"
2133                                             " 0x%p\n", (void *)q);
2134 
2135                                         freemsg(mir->mir_svc_pend_mp);
2136                                         mir->mir_svc_pend_mp = mp;
2137                                 } else {
2138                                         RPCLOG(16, "mir_rput_proto: already "
2139                                             "held a disconnect/ord rel "
2140                                             "indication. freeing ord rel "
2141                                             "ind on 0x%p\n", (void *)q);
2142                                         freemsg(mp);
2143                                 }
2144                         } else
2145                                 mir->mir_svc_pend_mp = mp;
2146 
2147                         mutex_exit(&mir->mir_mutex);
2148                         return;
2149 
2150                 default:
2151                         /* nfsd handles server-side non-data messages. */
2152                         break;
2153                 }
2154                 break;
2155 
2156         default:
2157                 break;
2158         }
2159 
2160         putnext(q, mp);
2161 }
2162 
2163 /*
2164  * The server-side read queues are used to hold inbound messages while
2165  * outbound flow control is exerted.  When outbound flow control is
2166  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
2167  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
2168  *
2169  * For the server side,  we have two types of messages queued. The first type
2170  * are messages that are ready to be XDR decoded and and then sent to the
2171  * RPC program's dispatch routine. The second type are "raw" messages that
2172  * haven't been processed, i.e. assembled from rpc record fragements into
2173  * full requests. The only time we will see the second type of message
2174  * queued is if we have a memory allocation failure while processing a
2175  * a raw message. The field mir_first_non_processed_mblk will mark the
2176  * first such raw message. So the flow for server side is:
2177  *
2178  *      - send processed queued messages to kRPC until we run out or find
2179  *        one that needs additional processing because we were short on memory
2180  *        earlier
2181  *      - process a message that was deferred because of lack of
2182  *        memory
2183  *      - continue processing messages until the queue empties or we
2184  *        have to stop because of lack of memory
2185  *      - during each of the above phase, if the queue is empty and
2186  *        there are no pending messages that were passed to the RPC
2187  *        layer, send upstream the pending disconnect/ordrel indication if
2188  *        there is one
2189  *
2190  * The read-side queue is also enabled by a bufcall callback if dupmsg
2191  * fails in mir_rput.
2192  */
2193 static void
2194 mir_rsrv(queue_t *q)
2195 {
2196         mir_t   *mir;
2197         mblk_t  *mp;
2198         mblk_t  *cmp = NULL;
2199         boolean_t stop_timer = B_FALSE;
2200 
2201         mir = (mir_t *)q->q_ptr;
2202         mutex_enter(&mir->mir_mutex);
2203 
2204         mp = NULL;
2205         switch (mir->mir_type) {
2206         case RPC_SERVER:
2207                 if (mir->mir_ref_cnt == 0)
2208                         mir->mir_hold_inbound = 0;
2209                 if (mir->mir_hold_inbound) {
2210 
2211                         ASSERT(cmp == NULL);
2212                         if (q->q_first == NULL) {
2213 
2214                                 MIR_CLEAR_INRSRV(mir);
2215 
2216                                 if (MIR_SVC_QUIESCED(mir)) {
2217                                         cmp = mir->mir_svc_pend_mp;
2218                                         mir->mir_svc_pend_mp = NULL;
2219                                 }
2220                         }
2221 
2222                         mutex_exit(&mir->mir_mutex);
2223 
2224                         if (cmp != NULL) {
2225                                 RPCLOG(16, "mir_rsrv: line %d: sending a held "
2226                                     "disconnect/ord rel indication upstream\n",
2227                                     __LINE__);
2228                                 putnext(q, cmp);
2229                         }
2230 
2231                         return;
2232                 }
2233                 while (mp = getq(q)) {
2234                         if (mir->mir_krpc_cell &&
2235                             (mir->mir_svc_no_more_msgs == 0)) {
2236                                 /*
2237                                  * If we were idle, turn off idle timer since
2238                                  * we aren't idle any more.
2239                                  */
2240                                 if (mir->mir_ref_cnt++ == 0)
2241                                         stop_timer = B_TRUE;
2242                                 if (mir_check_len(q,
2243                                     (int32_t)msgdsize(mp), mp))
2244                                         return;
2245                                 svc_queuereq(q, mp);
2246                         } else {
2247                                 /*
2248                                  * Count # of times this happens. Should be
2249                                  * never, but experience shows otherwise.
2250                                  */
2251                                 if (mir->mir_krpc_cell == NULL)
2252                                         mir_krpc_cell_null++;
2253                                 freemsg(mp);
2254                         }
2255                 }
2256                 break;
2257         case RPC_CLIENT:
2258                 break;
2259         default:
2260                 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
2261 
2262                 if (q->q_first == NULL)
2263                         MIR_CLEAR_INRSRV(mir);
2264 
2265                 mutex_exit(&mir->mir_mutex);
2266 
2267                 return;
2268         }
2269 
2270         /*
2271          * The timer is stopped after all the messages are processed.
2272          * The reason is that stopping the timer releases the mir_mutex
2273          * lock temporarily.  This means that the request can be serviced
2274          * while we are still processing the message queue.  This is not
2275          * good.  So we stop the timer here instead.
2276          */
2277         if (stop_timer)  {
2278                 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
2279                     "cnt going to non zero\n", (void *)WR(q));
2280                 mir_svc_idle_stop(WR(q), mir);
2281         }
2282 
2283         if (q->q_first == NULL) {
2284 
2285                 MIR_CLEAR_INRSRV(mir);
2286 
2287                 ASSERT(cmp == NULL);
2288                 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
2289                         cmp = mir->mir_svc_pend_mp;
2290                         mir->mir_svc_pend_mp = NULL;
2291                 }
2292 
2293                 mutex_exit(&mir->mir_mutex);
2294 
2295                 if (cmp != NULL) {
2296                         RPCLOG(16, "mir_rsrv: line %d: sending a held "
2297                             "disconnect/ord rel indication upstream\n",
2298                             __LINE__);
2299                         putnext(q, cmp);
2300                 }
2301 
2302                 return;
2303         }
2304         mutex_exit(&mir->mir_mutex);
2305 }
2306 
2307 static int mir_svc_policy_fails;
2308 
2309 /*
2310  * Called to send an event code to nfsd/lockd so that it initiates
2311  * connection close.
2312  */
2313 static int
2314 mir_svc_policy_notify(queue_t *q, int event)
2315 {
2316         mblk_t  *mp;
2317 #ifdef DEBUG
2318         mir_t *mir = (mir_t *)q->q_ptr;
2319         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2320 #endif
2321         ASSERT(q->q_flag & QREADR);
2322 
2323         /*
2324          * Create an M_DATA message with the event code and pass it to the
2325          * Stream head (nfsd or whoever created the stream will consume it).
2326          */
2327         mp = allocb(sizeof (int), BPRI_HI);
2328 
2329         if (!mp) {
2330 
2331                 mir_svc_policy_fails++;
2332                 RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
2333                     "%d\n", event);
2334                 return (ENOMEM);
2335         }
2336 
2337         U32_TO_BE32(event, mp->b_rptr);
2338         mp->b_wptr = mp->b_rptr + sizeof (int);
2339         putnext(q, mp);
2340         return (0);
2341 }
2342 
2343 /*
2344  * Server side: start the close phase. We want to get this rpcmod slot in an
2345  * idle state before mir_close() is called.
2346  */
2347 static void
2348 mir_svc_start_close(queue_t *wq, mir_t *mir)
2349 {
2350         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2351         ASSERT((wq->q_flag & QREADR) == 0);
2352         ASSERT(mir->mir_type == RPC_SERVER);
2353 
2354 
2355         /*
2356          * Do not accept any more messages.
2357          */
2358         mir->mir_svc_no_more_msgs = 1;
2359 
2360         /*
2361          * Next two statements will make the read service procedure invoke
2362          * svc_queuereq() on everything stuck in the streams read queue.
2363          * It's not necessary because enabling the write queue will
2364          * have the same effect, but why not speed the process along?
2365          */
2366         mir->mir_hold_inbound = 0;
2367         qenable(RD(wq));
2368 
2369         /*
2370          * Meanwhile force the write service procedure to send the
2371          * responses downstream, regardless of flow control.
2372          */
2373         qenable(wq);
2374 }
2375 
2376 /*
2377  * This routine is called directly by KRPC after a request is completed,
2378  * whether a reply was sent or the request was dropped.
2379  */
2380 static void
2381 mir_svc_release(queue_t *wq, mblk_t *mp)
2382 {
2383         mir_t   *mir = (mir_t *)wq->q_ptr;
2384         mblk_t  *cmp = NULL;
2385 
2386         ASSERT((wq->q_flag & QREADR) == 0);
2387         if (mp)
2388                 freemsg(mp);
2389 
2390         mutex_enter(&mir->mir_mutex);
2391 
2392         /*
2393          * Start idle processing if this is the last reference.
2394          */
2395         if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
2396                 cmp = mir->mir_svc_pend_mp;
2397                 mir->mir_svc_pend_mp = NULL;
2398         }
2399 
2400         if (cmp) {
2401                 RPCLOG(16, "mir_svc_release: sending a held "
2402                     "disconnect/ord rel indication upstream on queue 0x%p\n",
2403                     (void *)RD(wq));
2404 
2405                 mutex_exit(&mir->mir_mutex);
2406 
2407                 putnext(RD(wq), cmp);
2408 
2409                 mutex_enter(&mir->mir_mutex);
2410         }
2411 
2412         /*
2413          * Start idle processing if this is the last reference.
2414          */
2415         if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
2416 
2417                 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
2418                     "because ref cnt is zero\n", (void *) wq);
2419 
2420                 mir_svc_idle_start(wq, mir);
2421         }
2422 
2423         mir->mir_ref_cnt--;
2424         ASSERT(mir->mir_ref_cnt >= 0);
2425 
2426         /*
2427          * Wake up the thread waiting to close.
2428          */
2429 
2430         if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
2431                 cv_signal(&mir->mir_condvar);
2432 
2433         mutex_exit(&mir->mir_mutex);
2434 }
2435 
2436 /*
2437  * This routine is called by server-side KRPC when it is ready to
2438  * handle inbound messages on the stream.
2439  */
2440 static void
2441 mir_svc_start(queue_t *wq)
2442 {
2443         mir_t   *mir = (mir_t *)wq->q_ptr;
2444 
2445         /*
2446          * no longer need to take the mir_mutex because the
2447          * mir_setup_complete field has been moved out of
2448          * the binary field protected by the mir_mutex.
2449          */
2450 
2451         mir->mir_setup_complete = 1;
2452         qenable(RD(wq));
2453 }
2454 
2455 /*
2456  * client side wrapper for stopping timer with normal idle timeout.
2457  */
2458 static void
2459 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
2460 {
2461         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2462         ASSERT((wq->q_flag & QREADR) == 0);
2463         ASSERT(mir->mir_type == RPC_CLIENT);
2464 
2465         mir_timer_stop(mir);
2466 }
2467 
2468 /*
2469  * client side wrapper for stopping timer with normal idle timeout.
2470  */
2471 static void
2472 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
2473 {
2474         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2475         ASSERT((wq->q_flag & QREADR) == 0);
2476         ASSERT(mir->mir_type == RPC_CLIENT);
2477 
2478         mir_timer_start(wq, mir, mir->mir_idle_timeout);
2479 }
2480 
2481 /*
2482  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
2483  * end-points that aren't connected.
2484  */
2485 static void
2486 mir_clnt_idle_do_stop(queue_t *wq)
2487 {
2488         mir_t   *mir = (mir_t *)wq->q_ptr;
2489 
2490         RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
2491         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2492         mutex_enter(&mir->mir_mutex);
2493         mir_clnt_idle_stop(wq, mir);
2494         mutex_exit(&mir->mir_mutex);
2495 }
2496 
2497 /*
2498  * Timer handler.  It handles idle timeout and memory shortage problem.
2499  */
2500 static void
2501 mir_timer(void *arg)
2502 {
2503         queue_t *wq = (queue_t *)arg;
2504         mir_t *mir = (mir_t *)wq->q_ptr;
2505         boolean_t notify;
2506 
2507         mutex_enter(&mir->mir_mutex);
2508 
2509         /*
2510          * mir_timer_call is set only when either mir_timer_[start|stop]
2511          * is progressing.  And mir_timer() can only be run while they
2512          * are progressing if the timer is being stopped.  So just
2513          * return.
2514          */
2515         if (mir->mir_timer_call) {
2516                 mutex_exit(&mir->mir_mutex);
2517                 return;
2518         }
2519         mir->mir_timer_id = 0;
2520 
2521         switch (mir->mir_type) {
2522         case RPC_CLIENT:
2523 
2524                 /*
2525                  * For clients, the timer fires at clnt_idle_timeout
2526                  * intervals.  If the activity marker (mir_clntreq) is
2527                  * zero, then the stream has been idle since the last
2528                  * timer event and we notify KRPC.  If mir_clntreq is
2529                  * non-zero, then the stream is active and we just
2530                  * restart the timer for another interval.  mir_clntreq
2531                  * is set to 1 in mir_wput for every request passed
2532                  * downstream.
2533                  *
2534                  * If this was a memory shortage timer reset the idle
2535                  * timeout regardless; the mir_clntreq will not be a
2536                  * valid indicator.
2537                  *
2538                  * The timer is initially started in mir_wput during
2539                  * RPC_CLIENT ioctl processing.
2540                  *
2541                  * The timer interval can be changed for individual
2542                  * streams with the ND variable "mir_idle_timeout".
2543                  */
2544                 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
2545                     MSEC_TO_TICK(mir->mir_idle_timeout) - lbolt >= 0) {
2546                         clock_t tout;
2547 
2548                         tout = mir->mir_idle_timeout -
2549                             TICK_TO_MSEC(lbolt - mir->mir_use_timestamp);
2550                         if (tout < 0)
2551                                 tout = 1000;
2552 #if 0
2553                         printf("mir_timer[%d < %d + %d]: reset client timer "
2554                             "to %d (ms)\n", TICK_TO_MSEC(lbolt),
2555                             TICK_TO_MSEC(mir->mir_use_timestamp),
2556                             mir->mir_idle_timeout, tout);
2557 #endif
2558                         mir->mir_clntreq = 0;
2559                         mir_timer_start(wq, mir, tout);
2560                         mutex_exit(&mir->mir_mutex);
2561                         return;
2562                 }
2563 #if 0
2564 printf("mir_timer[%d]: doing client timeout\n", lbolt / hz);
2565 #endif
2566                 /*
2567                  * We are disconnecting, but not necessarily
2568                  * closing. By not closing, we will fail to
2569                  * pick up a possibly changed global timeout value,
2570                  * unless we store it now.
2571                  */
2572                 mir->mir_idle_timeout = clnt_idle_timeout;
2573                 mir_clnt_idle_start(wq, mir);
2574 
2575                 mutex_exit(&mir->mir_mutex);
2576                 /*
2577                  * We pass T_ORDREL_REQ as an integer value
2578                  * to KRPC as the indication that the stream
2579                  * is idle.  This is not a T_ORDREL_REQ message,
2580                  * it is just a convenient value since we call
2581                  * the same KRPC routine for T_ORDREL_INDs and
2582                  * T_DISCON_INDs.
2583                  */
2584                 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
2585                 return;
2586 
2587         case RPC_SERVER:
2588 
2589                 /*
2590                  * For servers, the timer is only running when the stream
2591                  * is really idle or memory is short.  The timer is started
2592                  * by mir_wput when mir_type is set to RPC_SERVER and
2593                  * by mir_svc_idle_start whenever the stream goes idle
2594                  * (mir_ref_cnt == 0).  The timer is cancelled in
2595                  * mir_rput whenever a new inbound request is passed to KRPC
2596                  * and the stream was previously idle.
2597                  *
2598                  * The timer interval can be changed for individual
2599                  * streams with the ND variable "mir_idle_timeout".
2600                  *
2601                  * If the stream is not idle do nothing.
2602                  */
2603                 if (!MIR_SVC_QUIESCED(mir)) {
2604                         mutex_exit(&mir->mir_mutex);
2605                         return;
2606                 }
2607 
2608                 notify = !mir->mir_inrservice;
2609                 mutex_exit(&mir->mir_mutex);
2610 
2611                 /*
2612                  * If there is no packet queued up in read queue, the stream
2613                  * is really idle so notify nfsd to close it.
2614                  */
2615                 if (notify) {
2616                         RPCLOG(16, "mir_timer: telling stream head listener "
2617                             "to close stream (0x%p)\n", (void *) RD(wq));
2618                         (void) mir_svc_policy_notify(RD(wq), 1);
2619                 }
2620                 return;
2621         default:
2622                 RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
2623                     mir->mir_type);
2624                 mutex_exit(&mir->mir_mutex);
2625                 return;
2626         }
2627 }
2628 
2629 /*
2630  * Called by the RPC package to send either a call or a return, or a
2631  * transport connection request.  Adds the record marking header.
2632  */
2633 static void
2634 mir_wput(queue_t *q, mblk_t *mp)
2635 {
2636         uint_t           frag_header;
2637         mir_t           *mir = (mir_t *)q->q_ptr;
2638         uchar_t         *rptr = mp->b_rptr;
2639         uint32_t         xid;
2640         uint32_t         dir;
2641 
2642         if (!mir) {
2643                 freemsg(mp);
2644                 return;
2645         }
2646 
2647         if (mp->b_datap->db_type != M_DATA) {
2648                 mir_wput_other(q, mp);
2649                 return;
2650         }
2651 
2652         if (mir->mir_ordrel_pending == 1) {
2653                 freemsg(mp);
2654                 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
2655                     (void *)q);
2656                 return;
2657         }
2658 
2659         frag_header = (uint_t)DLEN(mp);
2660         frag_header |= MIR_LASTFRAG;
2661 
2662         /* Stick in the 4 byte record marking header. */
2663         if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
2664             !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
2665                 /*
2666                  * Since we know that M_DATA messages are created exclusively
2667                  * by KRPC, we expect that KRPC will leave room for our header
2668                  * and 4 byte align which is normal for XDR.
2669                  * If KRPC (or someone else) does not cooperate, then we
2670                  * just throw away the message.
2671                  */
2672                 RPCLOG(1, "mir_wput: KRPC did not leave space for record "
2673                     "fragment header (%d bytes left)\n",
2674                     (int)(rptr - mp->b_datap->db_base));
2675                 freemsg(mp);
2676                 return;
2677         }
2678         rptr -= sizeof (uint32_t);
2679         *(uint32_t *)rptr = htonl(frag_header);
2680         mp->b_rptr = rptr;
2681 
2682         mutex_enter(&mir->mir_mutex);
2683         if (mir->mir_type == RPC_CLIENT) {
2684                 /*
2685                  * For the client, set mir_clntreq to indicate that the
2686                  * connection is active.
2687                  */
2688                 mir->mir_clntreq = 1;
2689                 mir->mir_use_timestamp = lbolt;
2690         }
2691 
2692         /*
2693          * If we haven't already queued some data and the downstream module
2694          * can accept more data, send it on, otherwise we queue the message
2695          * and take other actions depending on mir_type.
2696          */
2697         if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
2698                 mutex_exit(&mir->mir_mutex);
2699 
2700                 /*
2701                  * Now we pass the RPC message downstream.
2702                  */
2703                 putnext(q, mp);
2704                 return;
2705         }
2706 
2707         /*
2708          * Get msg direction and handle to the appropriate ctxt
2709          */
2710         if (!mir_dir_xid(mp, &dir, &xid)) {
2711                 /* XXX - if we can't get the dir, we're hosed */
2712                 mutex_exit(&mir->mir_mutex);
2713                 freemsg(mp);
2714                 return;
2715         }
2716 
2717         switch (mir->mir_type) {
2718         case RPC_CLIENT:
2719                 /*
2720                  * Check for a previous duplicate request on the
2721                  * queue.  If there is one, then we throw away
2722                  * the current message and let the previous one
2723                  * go through.  If we can't find a duplicate, then
2724                  * send this one.  This tap dance is an effort
2725                  * to reduce traffic and processing requirements
2726                  * under load conditions.
2727                  */
2728                 if (mir_clnt_dup_request(q, mp)) {
2729                         mutex_exit(&mir->mir_mutex);
2730                         freemsg(mp);
2731                         return;
2732                 }
2733                 break;
2734 
2735         case RPC_SERVER:
2736                 switch (dir) {
2737                 case CALL:
2738                         /*
2739                          * RPC Server doing Callball RPC
2740                          */
2741                         if (mir_clnt_dup_request(q, mp)) {
2742                                 mutex_exit(&mir->mir_mutex);
2743                                 freemsg(mp);
2744                                 return;
2745                         }
2746                         break;
2747 
2748                 case REPLY:
2749                 default:
2750                         /*
2751                          * Set mir_hold_inbound so that new inbound RPC
2752                          * messages will be held until the client catches
2753                          * up on the earlier replies.  This flag is cleared
2754                          * in mir_wsrv after flow control is relieved;
2755                          * the read-side queue is also enabled at that time.
2756                          */
2757                         mir->mir_hold_inbound = 1;
2758                         break;
2759                 }
2760                 break;
2761         default:
2762                 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
2763                 break;
2764         }
2765         mir->mir_inwservice = 1;
2766         (void) putq(q, mp);
2767         mutex_exit(&mir->mir_mutex);
2768 }
2769 
2770 static void
2771 mir_wput_other(queue_t *q, mblk_t *mp)
2772 {
2773         mir_t   *mir = (mir_t *)q->q_ptr;
2774         struct iocblk   *iocp;
2775         uchar_t *rptr = mp->b_rptr;
2776         bool_t  flush_in_svc = FALSE;
2777 
2778         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2779         switch (mp->b_datap->db_type) {
2780         case M_IOCTL:
2781                 iocp = (struct iocblk *)rptr;
2782                 switch (iocp->ioc_cmd) {
2783                 case RPC_CLIENT:
2784                         mutex_enter(&mir->mir_mutex);
2785                         if (mir->mir_type != 0 &&
2786                             mir->mir_type != iocp->ioc_cmd) {
2787 ioc_eperm:
2788                                 mutex_exit(&mir->mir_mutex);
2789                                 iocp->ioc_error = EPERM;
2790                                 iocp->ioc_count = 0;
2791                                 mp->b_datap->db_type = M_IOCACK;
2792                                 qreply(q, mp);
2793                                 return;
2794                         }
2795 
2796                         mir->mir_type = iocp->ioc_cmd;
2797 
2798                         /*
2799                          * Clear mir_hold_inbound which was set to 1 by
2800                          * mir_open.  This flag is not used on client
2801                          * streams.
2802                          */
2803                         mir->mir_hold_inbound = 0;
2804                         mir->mir_max_msg_sizep = &clnt_max_msg_size;
2805 
2806                         /*
2807                          * Start the idle timer.  See mir_timer() for more
2808                          * information on how client timers work.
2809                          */
2810                         mir->mir_idle_timeout = clnt_idle_timeout;
2811                         mir_clnt_idle_start(q, mir);
2812                         mutex_exit(&mir->mir_mutex);
2813 
2814                         mp->b_datap->db_type = M_IOCACK;
2815                         qreply(q, mp);
2816                         return;
2817                 case RPC_SERVER:
2818                         mutex_enter(&mir->mir_mutex);
2819                         if (mir->mir_type != 0 &&
2820                             mir->mir_type != iocp->ioc_cmd)
2821                                 goto ioc_eperm;
2822 
2823                         /*
2824                          * We don't clear mir_hold_inbound here because
2825                          * mir_hold_inbound is used in the flow control
2826                          * model. If we cleared it here, then we'd commit
2827                          * a small violation to the model where the transport
2828                          * might immediately block downstream flow.
2829                          */
2830 
2831                         mir->mir_type = iocp->ioc_cmd;
2832                         mir->mir_max_msg_sizep = &svc_max_msg_size;
2833 
2834                         /*
2835                          * Start the idle timer.  See mir_timer() for more
2836                          * information on how server timers work.
2837                          *
2838                          * Note that it is important to start the idle timer
2839                          * here so that connections time out even if we
2840                          * never receive any data on them.
2841                          */
2842                         mir->mir_idle_timeout = svc_idle_timeout;
2843                         RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
2844                             "because we got RPC_SERVER ioctl\n", (void *)q);
2845                         mir_svc_idle_start(q, mir);
2846                         mutex_exit(&mir->mir_mutex);
2847 
2848                         mp->b_datap->db_type = M_IOCACK;
2849                         qreply(q, mp);
2850                         return;
2851                 default:
2852                         break;
2853                 }
2854                 break;
2855 
2856         case M_PROTO:
2857                 if (mir->mir_type == RPC_CLIENT) {
2858                         /*
2859                          * We are likely being called from the context of a
2860                          * service procedure. So we need to enqueue. However
2861                          * enqueing may put our message behind data messages.
2862                          * So flush the data first.
2863                          */
2864                         flush_in_svc = TRUE;
2865                 }
2866                 if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
2867                     !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
2868                         break;
2869 
2870                 switch (((union T_primitives *)rptr)->type) {
2871                 case T_DATA_REQ:
2872                         /* Don't pass T_DATA_REQ messages downstream. */
2873                         freemsg(mp);
2874                         return;
2875                 case T_ORDREL_REQ:
2876                         RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
2877                             (void *)q);
2878                         mutex_enter(&mir->mir_mutex);
2879                         if (mir->mir_type != RPC_SERVER) {
2880                                 /*
2881                                  * We are likely being called from
2882                                  * clnt_dispatch_notifyall(). Sending
2883                                  * a T_ORDREL_REQ will result in
2884                                  * a some kind of _IND message being sent,
2885                                  * will be another call to
2886                                  * clnt_dispatch_notifyall(). To keep the stack
2887                                  * lean, queue this message.
2888                                  */
2889                                 mir->mir_inwservice = 1;
2890                                 (void) putq(q, mp);
2891                                 mutex_exit(&mir->mir_mutex);
2892                                 return;
2893                         }
2894 
2895                         /*
2896                          * Mark the structure such that we don't accept any
2897                          * more requests from client. We could defer this
2898                          * until we actually send the orderly release
2899                          * request downstream, but all that does is delay
2900                          * the closing of this stream.
2901                          */
2902                         RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
2903                             " so calling mir_svc_start_close\n", (void *)q);
2904 
2905                         mir_svc_start_close(q, mir);
2906 
2907                         /*
2908                          * If we have sent down a T_ORDREL_REQ, don't send
2909                          * any more.
2910                          */
2911                         if (mir->mir_ordrel_pending) {
2912                                 freemsg(mp);
2913                                 mutex_exit(&mir->mir_mutex);
2914                                 return;
2915                         }
2916 
2917                         /*
2918                          * If the stream is not idle, then we hold the
2919                          * orderly release until it becomes idle.  This
2920                          * ensures that KRPC will be able to reply to
2921                          * all requests that we have passed to it.
2922                          *
2923                          * We also queue the request if there is data already
2924                          * queued, because we cannot allow the T_ORDREL_REQ
2925                          * to go before data. When we had a separate reply
2926                          * count, this was not a problem, because the
2927                          * reply count was reconciled when mir_wsrv()
2928                          * completed.
2929                          */
2930                         if (!MIR_SVC_QUIESCED(mir) ||
2931                             mir->mir_inwservice == 1) {
2932                                 mir->mir_inwservice = 1;
2933                                 (void) putq(q, mp);
2934 
2935                                 RPCLOG(16, "mir_wput_other: queuing "
2936                                     "T_ORDREL_REQ on 0x%p\n", (void *)q);
2937 
2938                                 mutex_exit(&mir->mir_mutex);
2939                                 return;
2940                         }
2941 
2942                         /*
2943                          * Mark the structure so that we know we sent
2944                          * an orderly release request, and reset the idle timer.
2945                          */
2946                         mir->mir_ordrel_pending = 1;
2947 
2948                         RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
2949                             " on 0x%p because we got T_ORDREL_REQ\n",
2950                             (void *)q);
2951 
2952                         mir_svc_idle_start(q, mir);
2953                         mutex_exit(&mir->mir_mutex);
2954 
2955                         /*
2956                          * When we break, we will putnext the T_ORDREL_REQ.
2957                          */
2958                         break;
2959 
2960                 case T_CONN_REQ:
2961                         mutex_enter(&mir->mir_mutex);
2962                         if (mir->mir_head_mp != NULL) {
2963                                 freemsg(mir->mir_head_mp);
2964                                 mir->mir_head_mp = NULL;
2965                                 mir->mir_tail_mp = NULL;
2966                         }
2967                         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
2968                         /*
2969                          * Restart timer in case mir_clnt_idle_do_stop() was
2970                          * called.
2971                          */
2972                         mir->mir_idle_timeout = clnt_idle_timeout;
2973                         mir_clnt_idle_stop(q, mir);
2974                         mir_clnt_idle_start(q, mir);
2975                         mutex_exit(&mir->mir_mutex);
2976                         break;
2977 
2978                 default:
2979                         /*
2980                          * T_DISCON_REQ is one of the interesting default
2981                          * cases here. Ideally, an M_FLUSH is done before
2982                          * T_DISCON_REQ is done. However, that is somewhat
2983                          * cumbersome for clnt_cots.c to do. So we queue
2984                          * T_DISCON_REQ, and let the service procedure
2985                          * flush all M_DATA.
2986                          */
2987                         break;
2988                 }
2989                 /* fallthru */;
2990         default:
2991                 if (mp->b_datap->db_type >= QPCTL) {
2992                         if (mp->b_datap->db_type == M_FLUSH) {
2993                                 if (mir->mir_type == RPC_CLIENT &&
2994                                     *mp->b_rptr & FLUSHW) {
2995                                         RPCLOG(32, "mir_wput_other: flushing "
2996                                             "wq 0x%p\n", (void *)q);
2997                                         if (*mp->b_rptr & FLUSHBAND) {
2998                                                 flushband(q, *(mp->b_rptr + 1),
2999                                                     FLUSHDATA);
3000                                         } else {
3001                                                 flushq(q, FLUSHDATA);
3002                                         }
3003                                 } else {
3004                                         RPCLOG(32, "mir_wput_other: ignoring "
3005                                             "M_FLUSH on wq 0x%p\n", (void *)q);
3006                                 }
3007                         }
3008                         break;
3009                 }
3010 
3011                 mutex_enter(&mir->mir_mutex);
3012                 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
3013                         mutex_exit(&mir->mir_mutex);
3014                         break;
3015                 }
3016                 mir->mir_inwservice = 1;
3017                 mir->mir_inwflushdata = flush_in_svc;
3018                 (void) putq(q, mp);
3019                 mutex_exit(&mir->mir_mutex);
3020                 qenable(q);
3021 
3022                 return;
3023         }
3024         putnext(q, mp);
3025 }
3026 
3027 static void
3028 mir_wsrv(queue_t *q)
3029 {
3030         mblk_t  *mp;
3031         mir_t   *mir;
3032         bool_t flushdata;
3033 
3034         mir = (mir_t *)q->q_ptr;
3035         mutex_enter(&mir->mir_mutex);
3036 
3037         flushdata = mir->mir_inwflushdata;
3038         mir->mir_inwflushdata = 0;
3039 
3040         while (mp = getq(q)) {
3041                 if (mp->b_datap->db_type == M_DATA) {
3042                         /*
3043                          * Do not send any more data if we have sent
3044                          * a T_ORDREL_REQ.
3045                          */
3046                         if (flushdata || mir->mir_ordrel_pending == 1) {
3047                                 freemsg(mp);
3048                                 continue;
3049                         }
3050 
3051                         /*
3052                          * Make sure that the stream can really handle more
3053                          * data.
3054                          */
3055                         if (!MIR_WCANPUTNEXT(mir, q)) {
3056                                 (void) putbq(q, mp);
3057                                 mutex_exit(&mir->mir_mutex);
3058                                 return;
3059                         }
3060 
3061                         /*
3062                          * Now we pass the RPC message downstream.
3063                          */
3064                         mutex_exit(&mir->mir_mutex);
3065                         putnext(q, mp);
3066                         mutex_enter(&mir->mir_mutex);
3067                         continue;
3068                 }
3069 
3070                 /*
3071                  * This is not an RPC message, pass it downstream
3072                  * (ignoring flow control) if the server side is not sending a
3073                  * T_ORDREL_REQ downstream.
3074                  */
3075                 if (mir->mir_type != RPC_SERVER ||
3076                     ((union T_primitives *)mp->b_rptr)->type !=
3077                     T_ORDREL_REQ) {
3078                         mutex_exit(&mir->mir_mutex);
3079                         putnext(q, mp);
3080                         mutex_enter(&mir->mir_mutex);
3081                         continue;
3082                 }
3083 
3084                 if (mir->mir_ordrel_pending == 1) {
3085                         /*
3086                          * Don't send two T_ORDRELs
3087                          */
3088                         freemsg(mp);
3089                         continue;
3090                 }
3091 
3092                 /*
3093                  * Mark the structure so that we know we sent an orderly
3094                  * release request.  We will check to see slot is idle at the
3095                  * end of this routine, and if so, reset the idle timer to
3096                  * handle orderly release timeouts.
3097                  */
3098                 mir->mir_ordrel_pending = 1;
3099                 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
3100                     (void *)q);
3101                 /*
3102                  * Send the orderly release downstream. If there are other
3103                  * pending replies we won't be able to send them.  However,
3104                  * the only reason we should send the orderly release is if
3105                  * we were idle, or if an unusual event occurred.
3106                  */
3107                 mutex_exit(&mir->mir_mutex);
3108                 putnext(q, mp);
3109                 mutex_enter(&mir->mir_mutex);
3110         }
3111 
3112         if (q->q_first == NULL)
3113                 /*
3114                  * If we call mir_svc_idle_start() below, then
3115                  * clearing mir_inwservice here will also result in
3116                  * any thread waiting in mir_close() to be signaled.
3117                  */
3118                 mir->mir_inwservice = 0;
3119 
3120         if (mir->mir_type != RPC_SERVER) {
3121                 mutex_exit(&mir->mir_mutex);
3122                 return;
3123         }
3124 
3125         /*
3126          * If idle we call mir_svc_idle_start to start the timer (or wakeup
3127          * a close). Also make sure not to start the idle timer on the
3128          * listener stream. This can cause nfsd to send an orderly release
3129          * command on the listener stream.
3130          */
3131         if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
3132                 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
3133                     "because mir slot is idle\n", (void *)q);
3134                 mir_svc_idle_start(q, mir);
3135         }
3136 
3137         /*
3138          * If outbound flow control has been relieved, then allow new
3139          * inbound requests to be processed.
3140          */
3141         if (mir->mir_hold_inbound) {
3142                 mir->mir_hold_inbound = 0;
3143                 qenable(RD(q));
3144         }
3145         mutex_exit(&mir->mir_mutex);
3146 }
3147 
3148 static void
3149 mir_disconnect(queue_t *q, mir_t *mir)
3150 {
3151         ASSERT(MUTEX_HELD(&mir->mir_mutex));
3152 
3153         switch (mir->mir_type) {
3154         case RPC_CLIENT:
3155                 /*
3156                  * We are disconnecting, but not necessarily
3157                  * closing. By not closing, we will fail to
3158                  * pick up a possibly changed global timeout value,
3159                  * unless we store it now.
3160                  */
3161                 mir->mir_idle_timeout = clnt_idle_timeout;
3162                 mir_clnt_idle_start(WR(q), mir);
3163                 mutex_exit(&mir->mir_mutex);
3164 
3165                 /*
3166                  * T_DISCON_REQ is passed to KRPC as an integer value
3167                  * (this is not a TPI message).  It is used as a
3168                  * convenient value to indicate a sanity check
3169                  * failure -- the same KRPC routine is also called
3170                  * for T_DISCON_INDs and T_ORDREL_INDs.
3171                  */
3172                 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
3173                 break;
3174 
3175         case RPC_SERVER:
3176                 mir->mir_svc_no_more_msgs = 1;
3177                 mir_svc_idle_stop(WR(q), mir);
3178                 mutex_exit(&mir->mir_mutex);
3179                 RPCLOG(16, "mir_disconnect: telling "
3180                     "stream head listener to disconnect stream "
3181                     "(0x%p)\n", (void *) q);
3182                 (void) mir_svc_policy_notify(q, 2);
3183                 break;
3184 
3185         default:
3186                 mutex_exit(&mir->mir_mutex);
3187                 break;
3188         }
3189 }
3190 
3191 /*
3192  * Sanity check the message length, and if it's too large, shutdown the
3193  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
3194  */
3195 static int
3196 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
3197 {
3198         mir_t *mir = q->q_ptr;
3199         uint_t maxsize = 0;
3200 
3201         if (mir->mir_max_msg_sizep != NULL)
3202                 maxsize = *mir->mir_max_msg_sizep;
3203 
3204         if (maxsize == 0 || frag_len <= (int)maxsize)
3205                 return (0);
3206 
3207         freemsg(head_mp);
3208         mir->mir_head_mp = NULL;
3209         mir->mir_tail_mp = NULL;
3210         mir->mir_frag_header = 0;
3211         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
3212         if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
3213                 cmn_err(CE_NOTE,
3214                     "KRPC: record fragment from %s of size(%d) exceeds "
3215                     "maximum (%u). Disconnecting",
3216                     (mir->mir_type == RPC_CLIENT) ? "server" :
3217                     (mir->mir_type == RPC_SERVER) ? "client" :
3218                     "test tool", frag_len, maxsize);
3219         }
3220 
3221         mir_disconnect(q, mir);
3222         return (1);
3223 }
--- EOF ---