Move CallBack Server thread creation, initial processing and destruction to RPC
Cleanup some RPC code.
Remove extraneous fields from nfs41_cb_info and clean up the code.
Change KM_SLEEP in mir_nfs41_callback_thread to KM_NOSLEEP.
Fix lint warnings
Incorporate code review comments.

   1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /* Copyright (c) 1990 Mentat Inc. */
  26 
  27 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  28 /*        All Rights Reserved   */
  29 
  30 /*
  31  * Kernel RPC filtering module
  32  */
  33 
  34 #include <sys/param.h>
  35 #include <sys/types.h>
  36 #include <sys/stream.h>
  37 #include <sys/stropts.h>
  38 #include <sys/tihdr.h>
  39 #include <sys/timod.h>
  40 #include <sys/tiuser.h>
  41 #include <sys/debug.h>
  42 #include <sys/signal.h>
  43 #include <sys/pcb.h>
  44 #include <sys/user.h>
  45 #include <sys/errno.h>
  46 #include <sys/cred.h>
  47 #include <sys/policy.h>
  48 #include <sys/inline.h>
  49 #include <sys/cmn_err.h>
  50 #include <sys/kmem.h>
  51 #include <sys/file.h>
  52 #include <sys/sysmacros.h>
  53 #include <sys/systm.h>
  54 #include <sys/t_lock.h>
  55 #include <sys/ddi.h>
  56 #include <sys/vtrace.h>
  57 #include <sys/callb.h>
  58 #include <sys/strsun.h>
  59 
  60 #include <sys/strlog.h>
  61 #include <rpc/rpc_com.h>
  62 #include <inet/common.h>
  63 #include <rpc/types.h>
  64 #include <sys/time.h>
  65 #include <rpc/xdr.h>
  66 #include <rpc/auth.h>
  67 #include <rpc/clnt.h>
  68 #include <rpc/rpc_msg.h>
  69 #include <rpc/clnt.h>
  70 #include <rpc/svc.h>
  71 #include <rpc/rpcsys.h>
  72 #include <rpc/rpc_rdma.h>
  73 #include <sys/sdt.h>
  74 
  75 /*
  76  * This is the loadable module wrapper.
  77  */
  78 #include <sys/conf.h>
  79 #include <sys/modctl.h>
  80 #include <sys/syscall.h>
  81 
  82 extern struct streamtab rpcinfo;
  83 
  84 static struct fmodsw fsw = {
  85         "rpcmod",
  86         &rpcinfo,
  87         D_NEW|D_MP,
  88 };
  89 
  90 /*
  91  * Module linkage information for the kernel.
  92  */
  93 
  94 static struct modlstrmod modlstrmod = {
  95         &mod_strmodops, "rpc interface str mod", &fsw
  96 };
  97 
  98 /*
  99  * For the RPC system call.
 100  */
 101 static struct sysent rpcsysent = {
 102         2,
 103         SE_32RVAL1 | SE_ARGC | SE_NOUNLOAD,
 104         rpcsys
 105 };
 106 
 107 static struct modlsys modlsys = {
 108         &mod_syscallops,
 109         "RPC syscall",
 110         &rpcsysent
 111 };
 112 
 113 #ifdef _SYSCALL32_IMPL
 114 static struct modlsys modlsys32 = {
 115         &mod_syscallops32,
 116         "32-bit RPC syscall",
 117         &rpcsysent
 118 };
 119 #endif /* _SYSCALL32_IMPL */
 120 
 121 static struct modlinkage modlinkage = {
 122         MODREV_1,
 123         {
 124                 &modlsys,
 125 #ifdef _SYSCALL32_IMPL
 126                 &modlsys32,
 127 #endif
 128                 &modlstrmod,
 129                 NULL
 130         }
 131 };
 132 
 133 int
 134 _init(void)
 135 {
 136         int error = 0;
 137         callb_id_t cid;
 138         int status;
 139 
 140         svc_init();
 141         clnt_init();
 142         cid = callb_add(connmgr_cpr_reset, 0, CB_CL_CPR_RPC, "rpc");
 143 
 144         if (error = mod_install(&modlinkage)) {
 145                 /*
 146                  * Could not install module, cleanup previous
 147                  * initialization work.
 148                  */
 149                 clnt_fini();
 150                 if (cid != NULL)
 151                         (void) callb_delete(cid);
 152 
 153                 return (error);
 154         }
 155 
 156         /*
 157          * Load up the RDMA plugins and initialize the stats. Even if the
 158          * plugins loadup fails, but rpcmod was successfully installed the
 159          * counters still get initialized.
 160          */
 161         rw_init(&rdma_lock, NULL, RW_DEFAULT, NULL);
 162         mutex_init(&rdma_modload_lock, NULL, MUTEX_DEFAULT, NULL);
 163         mt_kstat_init();
 164 
 165         /*
 166          * Get our identification into ldi.  This is used for loading
 167          * other modules, e.g. rpcib.
 168          */
 169         status = ldi_ident_from_mod(&modlinkage, &rpcmod_li);
 170         if (status != 0) {
 171                 cmn_err(CE_WARN, "ldi_ident_from_mod fails with %d", status);
 172                 rpcmod_li = NULL;
 173         }
 174 
 175         return (error);
 176 }
 177 
 178 /*
 179  * The unload entry point fails, because we advertise entry points into
 180  * rpcmod from the rest of kRPC: rpcmod_release().
 181  */
 182 int
 183 _fini(void)
 184 {
 185         return (EBUSY);
 186 }
 187 
 188 int
 189 _info(struct modinfo *modinfop)
 190 {
 191         return (mod_info(&modlinkage, modinfop));
 192 }
 193 
 194 extern int nulldev();
 195 
 196 #define RPCMOD_ID       2049
 197 
 198 int rmm_open(), rmm_close();
 199 
 200 /*
 201  * To save instructions, since STREAMS ignores the return value
 202  * from these functions, they are defined as void here. Kind of icky, but...
 203  */
 204 void rmm_rput(queue_t *, mblk_t *);
 205 void rmm_wput(queue_t *, mblk_t *);
 206 void rmm_rsrv(queue_t *);
 207 void rmm_wsrv(queue_t *);
 208 
 209 int rpcmodopen(), rpcmodclose();
 210 void rpcmodrput(), rpcmodwput();
 211 void rpcmodrsrv(), rpcmodwsrv();
 212 
 213 static  void    rpcmodwput_other(queue_t *, mblk_t *);
 214 static  int     mir_close(queue_t *q);
 215 static  int     mir_open(queue_t *q, dev_t *devp, int flag, int sflag,
 216                     cred_t *credp);
 217 static  void    mir_rput(queue_t *q, mblk_t *mp);
 218 static  void    mir_rsrv(queue_t *q);
 219 static  void    mir_wput(queue_t *q, mblk_t *mp);
 220 static  void    mir_wsrv(queue_t *q);
 221 
 222 static struct module_info rpcmod_info =
 223         {RPCMOD_ID, "rpcmod", 0, INFPSZ, 256*1024, 1024};
 224 
 225 /*
 226  * Read side has no service procedure.
 227  */
 228 static struct qinit rpcmodrinit = {
 229         (int (*)())rmm_rput,
 230         (int (*)())rmm_rsrv,
 231         rmm_open,
 232         rmm_close,
 233         nulldev,
 234         &rpcmod_info,
 235         NULL
 236 };
 237 
 238 /*
 239  * The write put procedure is simply putnext to conserve stack space.
 240  * The write service procedure is not used to queue data, but instead to
 241  * synchronize with flow control.
 242  */
 243 static struct qinit rpcmodwinit = {
 244         (int (*)())rmm_wput,
 245         (int (*)())rmm_wsrv,
 246         rmm_open,
 247         rmm_close,
 248         nulldev,
 249         &rpcmod_info,
 250         NULL
 251 };
 252 struct streamtab rpcinfo = { &rpcmodrinit, &rpcmodwinit, NULL, NULL };
 253 
 254 struct xprt_style_ops {
 255         int (*xo_open)();
 256         int (*xo_close)();
 257         void (*xo_wput)();
 258         void (*xo_wsrv)();
 259         void (*xo_rput)();
 260         void (*xo_rsrv)();
 261 };
 262 
 263 static struct xprt_style_ops xprt_clts_ops = {
 264         rpcmodopen,
 265         rpcmodclose,
 266         rpcmodwput,
 267         rpcmodwsrv,
 268         rpcmodrput,
 269         NULL
 270 };
 271 
 272 static struct xprt_style_ops xprt_cots_ops = {
 273         mir_open,
 274         mir_close,
 275         mir_wput,
 276         mir_wsrv,
 277         mir_rput,
 278         mir_rsrv
 279 };
 280 
 281 /*
 282  * Per rpcmod "slot" data structure. q->q_ptr points to one of these.
 283  */
 284 struct rpcm {
 285         void            *rm_krpc_cell;  /* Reserved for use by KRPC */
 286         struct          xprt_style_ops  *rm_ops;
 287         int             rm_type;        /* Client or server side stream */
 288 #define RM_CLOSING      0x1             /* somebody is trying to close slot */
 289         uint_t          rm_state;       /* state of the slot. see above */
 290         uint_t          rm_ref;         /* cnt of external references to slot */
 291         kmutex_t        rm_lock;        /* mutex protecting above fields */
 292         kcondvar_t      rm_cwait;       /* condition for closing */
 293         zoneid_t        rm_zoneid;      /* zone which pushed rpcmod */
 294 };
 295 
 296 struct temp_slot {
 297         void *cell;
 298         struct xprt_style_ops *ops;
 299         int type;
 300         mblk_t *info_ack;
 301         kmutex_t lock;
 302         kcondvar_t wait;
 303 };
 304 
 305 typedef struct mir_s {
 306         void    *mir_krpc_cell; /* Reserved for KRPC use. This field */
 307                                         /* must be first in the structure. */
 308         struct xprt_style_ops   *rm_ops;
 309         int     mir_type;               /* Client or server side stream */
 310 
 311         mblk_t  *mir_head_mp;           /* RPC msg in progress */
 312                 /*
 313                  * mir_head_mp points the first mblk being collected in
 314                  * the current RPC message.  Record headers are removed
 315                  * before data is linked into mir_head_mp.
 316                  */
 317         mblk_t  *mir_tail_mp;           /* Last mblk in mir_head_mp */
 318                 /*
 319                  * mir_tail_mp points to the last mblk in the message
 320                  * chain starting at mir_head_mp.  It is only valid
 321                  * if mir_head_mp is non-NULL and is used to add new
 322                  * data blocks to the end of chain quickly.
 323                  */
 324 
 325         int32_t mir_frag_len;           /* Bytes seen in the current frag */
 326                 /*
 327                  * mir_frag_len starts at -4 for beginning of each fragment.
 328                  * When this length is negative, it indicates the number of
 329                  * bytes that rpcmod needs to complete the record marker
 330                  * header.  When it is positive or zero, it holds the number
 331                  * of bytes that have arrived for the current fragment and
 332                  * are held in mir_header_mp.
 333                  */
 334 
 335         int32_t mir_frag_header;
 336                 /*
 337                  * Fragment header as collected for the current fragment.
 338                  * It holds the last-fragment indicator and the number
 339                  * of bytes in the fragment.
 340                  */
 341 
 342         unsigned int
 343                 mir_ordrel_pending : 1, /* Sent T_ORDREL_REQ */
 344                 mir_hold_inbound : 1,   /* Hold inbound messages on server */
 345                                         /* side until outbound flow control */
 346                                         /* is relieved. */
 347                 mir_closing : 1,        /* The stream is being closed */
 348                 mir_inrservice : 1,     /* data queued or rd srv proc running */
 349                 mir_inwservice : 1,     /* data queued or wr srv proc running */
 350                 mir_inwflushdata : 1,   /* flush M_DATAs when srv runs */
 351                 /*
 352                  * On client streams, mir_clntreq is 0 or 1; it is set
 353                  * to 1 whenever a new request is sent out (mir_wput)
 354                  * and cleared when the timer fires (mir_timer).  If
 355                  * the timer fires with this value equal to 0, then the
 356                  * stream is considered idle and KRPC is notified.
 357                  */
 358                 mir_clntreq : 1,
 359                 /*
 360                  * On server streams, stop accepting messages
 361                  */
 362                 mir_svc_no_more_msgs : 1,
 363                 mir_listen_stream : 1,  /* listen end point */
 364                 mir_unused : 1, /* no longer used */
 365                 mir_timer_call : 1,
 366                 mir_junk_fill_thru_bit_31 : 21;
 367 
 368         int     mir_setup_complete;     /* server has initialized everything */
 369         timeout_id_t mir_timer_id;      /* Timer for idle checks */
 370         clock_t mir_idle_timeout;       /* Allowed idle time before shutdown */
 371                 /*
 372                  * This value is copied from clnt_idle_timeout or
 373                  * svc_idle_timeout during the appropriate ioctl.
 374                  * Kept in milliseconds
 375                  */
 376         clock_t mir_use_timestamp;      /* updated on client with each use */
 377                 /*
 378                  * This value is set to lbolt
 379                  * every time a client stream sends or receives data.
 380                  * Even if the timer message arrives, we don't shutdown
 381                  * client unless:
 382                  *    lbolt >= MSEC_TO_TICK(mir_idle_timeout)+mir_use_timestamp.
 383                  * This value is kept in HZ.
 384                  */
 385 
 386         uint_t  *mir_max_msg_sizep;     /* Reference to sanity check size */
 387                 /*
 388                  * This pointer is set to &clnt_max_msg_size or
 389                  * &svc_max_msg_size during the appropriate ioctl.
 390                  */
 391         zoneid_t mir_zoneid;    /* zone which pushed rpcmod */
 392         /* Server-side fields. */
 393         int     mir_ref_cnt;            /* Reference count: server side only */
 394                                         /* counts the number of references */
 395                                         /* that a kernel RPC server thread */
 396                                         /* (see svc_run()) has on this rpcmod */
 397                                         /* slot. Effectively, it is the */
 398                                         /* number * of unprocessed messages */
 399                                         /* that have been passed up to the */
 400                                         /* KRPC layer */
 401 
 402         mblk_t  *mir_svc_pend_mp;       /* Pending T_ORDREL_IND or */
 403                                         /* T_DISCON_IND */
 404 
 405         /*
 406          * these fields are for both client and server, but for debugging,
 407          * it is easier to have these last in the structure.
 408          */
 409         kmutex_t        mir_mutex;      /* Mutex and condvar for close */
 410         kcondvar_t      mir_condvar;    /* synchronization. */
 411         kcondvar_t      mir_timer_cv;   /* Timer routine sync. */
 412         void            *mir_cb;        /* For callbacks */
 413 } mir_t;
 414 
 415 void tmp_rput(queue_t *q, mblk_t *mp);
 416 
 417 struct xprt_style_ops tmpops = {
 418         NULL,
 419         NULL,
 420         putnext,
 421         NULL,
 422         tmp_rput,
 423         NULL
 424 };
 425 
 426 void
 427 tmp_rput(queue_t *q, mblk_t *mp)
 428 {
 429         struct temp_slot *t = (struct temp_slot *)(q->q_ptr);
 430         struct T_info_ack *pptr;
 431 
 432         switch (mp->b_datap->db_type) {
 433         case M_PCPROTO:
 434                 pptr = (struct T_info_ack *)mp->b_rptr;
 435                 switch (pptr->PRIM_type) {
 436                 case T_INFO_ACK:
 437                         mutex_enter(&t->lock);
 438                         t->info_ack = mp;
 439                         cv_signal(&t->wait);
 440                         mutex_exit(&t->lock);
 441                         return;
 442                 default:
 443                         break;
 444                 }
 445         default:
 446                 break;
 447         }
 448 
 449         /*
 450          * Not an info-ack, so free it. This is ok because we should
 451          * not be receiving data until the open finishes: rpcmod
 452          * is pushed well before the end-point is bound to an address.
 453          */
 454         freemsg(mp);
 455 }
 456 
 457 int
 458 rmm_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 459 {
 460         mblk_t *bp;
 461         struct temp_slot ts, *t;
 462         struct T_info_ack *pptr;
 463         int error = 0;
 464 
 465         ASSERT(q != NULL);
 466         /*
 467          * Check for re-opens.
 468          */
 469         if (q->q_ptr) {
 470                 TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END,
 471                     "rpcmodopen_end:(%s)", "q->qptr");
 472                 return (0);
 473         }
 474 
 475         t = &ts;
 476         bzero(t, sizeof (*t));
 477         q->q_ptr = (void *)t;
 478         WR(q)->q_ptr = (void *)t;
 479 
 480         /*
 481          * Allocate the required messages upfront.
 482          */
 483         if ((bp = allocb(sizeof (struct T_info_req) +
 484             sizeof (struct T_info_ack), BPRI_LO)) == (mblk_t *)NULL) {
 485                 return (ENOBUFS);
 486         }
 487 
 488         mutex_init(&t->lock, NULL, MUTEX_DEFAULT, NULL);
 489         cv_init(&t->wait, NULL, CV_DEFAULT, NULL);
 490 
 491         t->ops = &tmpops;
 492 
 493         qprocson(q);
 494         bp->b_datap->db_type = M_PCPROTO;
 495         *(int32_t *)bp->b_wptr = (int32_t)T_INFO_REQ;
 496         bp->b_wptr += sizeof (struct T_info_req);
 497         putnext(WR(q), bp);
 498 
 499         mutex_enter(&t->lock);
 500         while (t->info_ack == NULL) {
 501                 if (cv_wait_sig(&t->wait, &t->lock) == 0) {
 502                         error = EINTR;
 503                         break;
 504                 }
 505         }
 506         mutex_exit(&t->lock);
 507 
 508         if (error)
 509                 goto out;
 510 
 511         pptr = (struct T_info_ack *)t->info_ack->b_rptr;
 512 
 513         if (pptr->SERV_type == T_CLTS) {
 514                 if ((error = rpcmodopen(q, devp, flag, sflag, crp)) == 0)
 515                         ((struct rpcm *)q->q_ptr)->rm_ops = &xprt_clts_ops;
 516         } else {
 517                 if ((error = mir_open(q, devp, flag, sflag, crp)) == 0)
 518                         ((mir_t *)q->q_ptr)->rm_ops = &xprt_cots_ops;
 519         }
 520 
 521 out:
 522         if (error)
 523                 qprocsoff(q);
 524 
 525         freemsg(t->info_ack);
 526         mutex_destroy(&t->lock);
 527         cv_destroy(&t->wait);
 528 
 529         return (error);
 530 }
 531 
 532 void
 533 rmm_rput(queue_t *q, mblk_t  *mp)
 534 {
 535         (*((struct temp_slot *)q->q_ptr)->ops->xo_rput)(q, mp);
 536 }
 537 
 538 void
 539 rmm_rsrv(queue_t *q)
 540 {
 541         (*((struct temp_slot *)q->q_ptr)->ops->xo_rsrv)(q);
 542 }
 543 
 544 void
 545 rmm_wput(queue_t *q, mblk_t *mp)
 546 {
 547         (*((struct temp_slot *)q->q_ptr)->ops->xo_wput)(q, mp);
 548 }
 549 
 550 void
 551 rmm_wsrv(queue_t *q)
 552 {
 553         (*((struct temp_slot *)q->q_ptr)->ops->xo_wsrv)(q);
 554 }
 555 
 556 int
 557 rmm_close(queue_t *q, int flag, cred_t *crp)
 558 {
 559         return ((*((struct temp_slot *)q->q_ptr)->ops->xo_close)(q, flag, crp));
 560 }
 561 
 562 /*
 563  * rpcmodopen - open routine gets called when the module gets pushed
 564  *              onto the stream.
 565  */
 566 /*ARGSUSED*/
 567 int
 568 rpcmodopen(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *crp)
 569 {
 570         struct rpcm *rmp;
 571 
 572         extern void (*rpc_rele)(queue_t *, mblk_t *);
 573         static void rpcmod_release(queue_t *, mblk_t *);
 574 
 575         TRACE_0(TR_FAC_KRPC, TR_RPCMODOPEN_START, "rpcmodopen_start:");
 576 
 577         /*
 578          * Initialize entry points to release a rpcmod slot (and an input
 579          * message if supplied) and to send an output message to the module
 580          * below rpcmod.
 581          */
 582         if (rpc_rele == NULL)
 583                 rpc_rele = rpcmod_release;
 584 
 585         /*
 586          * Only sufficiently privileged users can use this module, and it
 587          * is assumed that they will use this module properly, and NOT send
 588          * bulk data from downstream.
 589          */
 590         if (secpolicy_rpcmod_open(crp) != 0)
 591                 return (EPERM);
 592 
 593         /*
 594          * Allocate slot data structure.
 595          */
 596         rmp = kmem_zalloc(sizeof (*rmp), KM_SLEEP);
 597 
 598         mutex_init(&rmp->rm_lock, NULL, MUTEX_DEFAULT, NULL);
 599         cv_init(&rmp->rm_cwait, NULL, CV_DEFAULT, NULL);
 600         rmp->rm_zoneid = rpc_zoneid();
 601         /*
 602          * slot type will be set by kRPC client and server ioctl's
 603          */
 604         rmp->rm_type = 0;
 605 
 606         q->q_ptr = (void *)rmp;
 607         WR(q)->q_ptr = (void *)rmp;
 608 
 609         TRACE_1(TR_FAC_KRPC, TR_RPCMODOPEN_END, "rpcmodopen_end:(%s)", "end");
 610         return (0);
 611 }
 612 
 613 /*
 614  * rpcmodclose - This routine gets called when the module gets popped
 615  * off of the stream.
 616  */
 617 /*ARGSUSED*/
 618 int
 619 rpcmodclose(queue_t *q, int flag, cred_t *crp)
 620 {
 621         struct rpcm *rmp;
 622 
 623         ASSERT(q != NULL);
 624         rmp = (struct rpcm *)q->q_ptr;
 625 
 626         /*
 627          * Mark our state as closing.
 628          */
 629         mutex_enter(&rmp->rm_lock);
 630         rmp->rm_state |= RM_CLOSING;
 631 
 632         /*
 633          * Check and see if there are any messages on the queue.  If so, send
 634          * the messages, regardless whether the downstream module is ready to
 635          * accept data.
 636          */
 637         if (rmp->rm_type == RPC_SERVER) {
 638                 flushq(q, FLUSHDATA);
 639 
 640                 qenable(WR(q));
 641 
 642                 if (rmp->rm_ref) {
 643                         mutex_exit(&rmp->rm_lock);
 644                         /*
 645                          * call into SVC to clean the queue
 646                          */
 647                         svc_queueclean(q);
 648                         mutex_enter(&rmp->rm_lock);
 649 
 650                         /*
 651                          * Block while there are kRPC threads with a reference
 652                          * to this message.
 653                          */
 654                         while (rmp->rm_ref)
 655                                 cv_wait(&rmp->rm_cwait, &rmp->rm_lock);
 656                 }
 657 
 658                 mutex_exit(&rmp->rm_lock);
 659 
 660                 /*
 661                  * It is now safe to remove this queue from the stream. No kRPC
 662                  * threads have a reference to the stream, and none ever will,
 663                  * because RM_CLOSING is set.
 664                  */
 665                 qprocsoff(q);
 666 
 667                 /* Notify kRPC that this stream is going away. */
 668                 svc_queueclose(q);
 669         } else {
 670                 mutex_exit(&rmp->rm_lock);
 671                 qprocsoff(q);
 672         }
 673 
 674         q->q_ptr = NULL;
 675         WR(q)->q_ptr = NULL;
 676         mutex_destroy(&rmp->rm_lock);
 677         cv_destroy(&rmp->rm_cwait);
 678         kmem_free(rmp, sizeof (*rmp));
 679         return (0);
 680 }
 681 
 682 #ifdef  DEBUG
 683 int     rpcmod_send_msg_up = 0;
 684 int     rpcmod_send_uderr = 0;
 685 int     rpcmod_send_dup = 0;
 686 int     rpcmod_send_dup_cnt = 0;
 687 #endif
 688 
 689 /*
 690  * rpcmodrput - Module read put procedure.  This is called from
 691  *              the module, driver, or stream head downstream.
 692  */
 693 void
 694 rpcmodrput(queue_t *q, mblk_t *mp)
 695 {
 696         struct rpcm *rmp;
 697         union T_primitives *pptr;
 698         int hdrsz;
 699 
 700         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_START, "rpcmodrput_start:");
 701 
 702         ASSERT(q != NULL);
 703         rmp = (struct rpcm *)q->q_ptr;
 704 
 705         if (rmp->rm_type == 0) {
 706                 freemsg(mp);
 707                 return;
 708         }
 709 
 710 #ifdef DEBUG
 711         if (rpcmod_send_msg_up > 0) {
 712                 mblk_t *nmp = copymsg(mp);
 713                 if (nmp) {
 714                         putnext(q, nmp);
 715                         rpcmod_send_msg_up--;
 716                 }
 717         }
 718         if ((rpcmod_send_uderr > 0) && mp->b_datap->db_type == M_PROTO) {
 719                 mblk_t *nmp;
 720                 struct T_unitdata_ind *data;
 721                 struct T_uderror_ind *ud;
 722                 int d;
 723                 data = (struct T_unitdata_ind *)mp->b_rptr;
 724                 if (data->PRIM_type == T_UNITDATA_IND) {
 725                         d = sizeof (*ud) - sizeof (*data);
 726                         nmp = allocb(mp->b_wptr - mp->b_rptr + d, BPRI_HI);
 727                         if (nmp) {
 728                                 ud = (struct T_uderror_ind *)nmp->b_rptr;
 729                                 ud->PRIM_type = T_UDERROR_IND;
 730                                 ud->DEST_length = data->SRC_length;
 731                                 ud->DEST_offset = data->SRC_offset + d;
 732                                 ud->OPT_length = data->OPT_length;
 733                                 ud->OPT_offset = data->OPT_offset + d;
 734                                 ud->ERROR_type = ENETDOWN;
 735                                 if (data->SRC_length) {
 736                                         bcopy(mp->b_rptr +
 737                                             data->SRC_offset,
 738                                             nmp->b_rptr +
 739                                             ud->DEST_offset,
 740                                             data->SRC_length);
 741                                 }
 742                                 if (data->OPT_length) {
 743                                         bcopy(mp->b_rptr +
 744                                             data->OPT_offset,
 745                                             nmp->b_rptr +
 746                                             ud->OPT_offset,
 747                                             data->OPT_length);
 748                                 }
 749                                 nmp->b_wptr += d;
 750                                 nmp->b_wptr += (mp->b_wptr - mp->b_rptr);
 751                                 nmp->b_datap->db_type = M_PROTO;
 752                                 putnext(q, nmp);
 753                                 rpcmod_send_uderr--;
 754                         }
 755                 }
 756         }
 757 #endif
 758         switch (mp->b_datap->db_type) {
 759         default:
 760                 putnext(q, mp);
 761                 break;
 762 
 763         case M_PROTO:
 764         case M_PCPROTO:
 765                 ASSERT((mp->b_wptr - mp->b_rptr) >= sizeof (int32_t));
 766                 pptr = (union T_primitives *)mp->b_rptr;
 767 
 768                 /*
 769                  * Forward this message to krpc if it is data.
 770                  */
 771                 if (pptr->type == T_UNITDATA_IND) {
 772                         mblk_t *nmp;
 773 
 774                 /*
 775                  * Check if the module is being popped.
 776                  */
 777                         mutex_enter(&rmp->rm_lock);
 778                         if (rmp->rm_state & RM_CLOSING) {
 779                                 mutex_exit(&rmp->rm_lock);
 780                                 putnext(q, mp);
 781                                 break;
 782                         }
 783 
 784                         switch (rmp->rm_type) {
 785                         case RPC_CLIENT:
 786                                 mutex_exit(&rmp->rm_lock);
 787                                 hdrsz = mp->b_wptr - mp->b_rptr;
 788 
 789                                 /*
 790                                  * Make sure the header is sane.
 791                                  */
 792                                 if (hdrsz < TUNITDATAINDSZ ||
 793                                     hdrsz < (pptr->unitdata_ind.OPT_length +
 794                                     pptr->unitdata_ind.OPT_offset) ||
 795                                     hdrsz < (pptr->unitdata_ind.SRC_length +
 796                                     pptr->unitdata_ind.SRC_offset)) {
 797                                         freemsg(mp);
 798                                         return;
 799                                 }
 800 
 801                                 /*
 802                                  * Call clnt_clts_dispatch_notify, so that it
 803                                  * can pass the message to the proper caller.
 804                                  * Don't discard the header just yet since the
 805                                  * client may need the sender's address.
 806                                  */
 807                                 clnt_clts_dispatch_notify(mp, hdrsz,
 808                                     rmp->rm_zoneid);
 809                                 return;
 810                         case RPC_SERVER:
 811                                 /*
 812                                  * rm_krpc_cell is exclusively used by the kRPC
 813                                  * CLTS server
 814                                  */
 815                                 if (rmp->rm_krpc_cell) {
 816 #ifdef DEBUG
 817                                         /*
 818                                          * Test duplicate request cache and
 819                                          * rm_ref count handling by sending a
 820                                          * duplicate every so often, if
 821                                          * desired.
 822                                          */
 823                                         if (rpcmod_send_dup &&
 824                                             rpcmod_send_dup_cnt++ %
 825                                             rpcmod_send_dup)
 826                                                 nmp = copymsg(mp);
 827                                         else
 828                                                 nmp = NULL;
 829 #endif
 830                                         /*
 831                                          * Raise the reference count on this
 832                                          * module to prevent it from being
 833                                          * popped before krpc generates the
 834                                          * reply.
 835                                          */
 836                                         rmp->rm_ref++;
 837                                         mutex_exit(&rmp->rm_lock);
 838 
 839                                         /*
 840                                          * Submit the message to krpc.
 841                                          */
 842                                         svc_queuereq(q, mp);
 843 #ifdef DEBUG
 844                                         /*
 845                                          * Send duplicate if we created one.
 846                                          */
 847                                         if (nmp) {
 848                                                 mutex_enter(&rmp->rm_lock);
 849                                                 rmp->rm_ref++;
 850                                                 mutex_exit(&rmp->rm_lock);
 851                                                 svc_queuereq(q, nmp);
 852                                         }
 853 #endif
 854                                 } else {
 855                                         mutex_exit(&rmp->rm_lock);
 856                                         freemsg(mp);
 857                                 }
 858                                 return;
 859                         default:
 860                                 mutex_exit(&rmp->rm_lock);
 861                                 freemsg(mp);
 862                                 return;
 863                         } /* end switch(rmp->rm_type) */
 864                 } else if (pptr->type == T_UDERROR_IND) {
 865                         mutex_enter(&rmp->rm_lock);
 866                         hdrsz = mp->b_wptr - mp->b_rptr;
 867 
 868                         /*
 869                          * Make sure the header is sane
 870                          */
 871                         if (hdrsz < TUDERRORINDSZ ||
 872                             hdrsz < (pptr->uderror_ind.OPT_length +
 873                             pptr->uderror_ind.OPT_offset) ||
 874                             hdrsz < (pptr->uderror_ind.DEST_length +
 875                             pptr->uderror_ind.DEST_offset)) {
 876                                 mutex_exit(&rmp->rm_lock);
 877                                 freemsg(mp);
 878                                 return;
 879                         }
 880 
 881                         /*
 882                          * In the case where a unit data error has been
 883                          * received, all we need to do is clear the message from
 884                          * the queue.
 885                          */
 886                         mutex_exit(&rmp->rm_lock);
 887                         freemsg(mp);
 888                         RPCLOG(32, "rpcmodrput: unitdata error received at "
 889                             "%ld\n", gethrestime_sec());
 890                         return;
 891                 } /* end else if (pptr->type == T_UDERROR_IND) */
 892 
 893                 putnext(q, mp);
 894                 break;
 895         } /* end switch (mp->b_datap->db_type) */
 896 
 897         TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
 898             "rpcmodrput_end:");
 899         /*
 900          * Return codes are not looked at by the STREAMS framework.
 901          */
 902 }
 903 
 904 /*
 905  * write put procedure
 906  */
 907 void
 908 rpcmodwput(queue_t *q, mblk_t *mp)
 909 {
 910         struct rpcm     *rmp;
 911 
 912         ASSERT(q != NULL);
 913 
 914         switch (mp->b_datap->db_type) {
 915                 case M_PROTO:
 916                 case M_PCPROTO:
 917                         break;
 918                 default:
 919                         rpcmodwput_other(q, mp);
 920                         return;
 921         }
 922 
 923         /*
 924          * Check to see if we can send the message downstream.
 925          */
 926         if (canputnext(q)) {
 927                 putnext(q, mp);
 928                 return;
 929         }
 930 
 931         rmp = (struct rpcm *)q->q_ptr;
 932         ASSERT(rmp != NULL);
 933 
 934         /*
 935          * The first canputnext failed.  Try again except this time with the
 936          * lock held, so that we can check the state of the stream to see if
 937          * it is closing.  If either of these conditions evaluate to true
 938          * then send the meesage.
 939          */
 940         mutex_enter(&rmp->rm_lock);
 941         if (canputnext(q) || (rmp->rm_state & RM_CLOSING)) {
 942                 mutex_exit(&rmp->rm_lock);
 943                 putnext(q, mp);
 944         } else {
 945                 /*
 946                  * canputnext failed again and the stream is not closing.
 947                  * Place the message on the queue and let the service
 948                  * procedure handle the message.
 949                  */
 950                 mutex_exit(&rmp->rm_lock);
 951                 (void) putq(q, mp);
 952         }
 953 }
 954 
 955 static void
 956 rpcmodwput_other(queue_t *q, mblk_t *mp)
 957 {
 958         struct rpcm     *rmp;
 959         struct iocblk   *iocp;
 960 
 961         rmp = (struct rpcm *)q->q_ptr;
 962         ASSERT(rmp != NULL);
 963 
 964         switch (mp->b_datap->db_type) {
 965                 case M_IOCTL:
 966                         iocp = (struct iocblk *)mp->b_rptr;
 967                         ASSERT(iocp != NULL);
 968                         switch (iocp->ioc_cmd) {
 969                                 case RPC_CLIENT:
 970                                 case RPC_SERVER:
 971                                         mutex_enter(&rmp->rm_lock);
 972                                         rmp->rm_type = iocp->ioc_cmd;
 973                                         mutex_exit(&rmp->rm_lock);
 974                                         mp->b_datap->db_type = M_IOCACK;
 975                                         qreply(q, mp);
 976                                         return;
 977                                 default:
 978                                 /*
 979                                  * pass the ioctl downstream and hope someone
 980                                  * down there knows how to handle it.
 981                                  */
 982                                         putnext(q, mp);
 983                                         return;
 984                         }
 985                 default:
 986                         break;
 987         }
 988         /*
 989          * This is something we definitely do not know how to handle, just
 990          * pass the message downstream
 991          */
 992         putnext(q, mp);
 993 }
 994 
 995 /*
 996  * Module write service procedure. This is called by downstream modules
 997  * for back enabling during flow control.
 998  */
 999 void
1000 rpcmodwsrv(queue_t *q)
1001 {
1002         struct rpcm     *rmp;
1003         mblk_t          *mp = NULL;
1004 
1005         rmp = (struct rpcm *)q->q_ptr;
1006         ASSERT(rmp != NULL);
1007 
1008         /*
1009          * Get messages that may be queued and send them down stream
1010          */
1011         while ((mp = getq(q)) != NULL) {
1012                 /*
1013                  * Optimize the service procedure for the server-side, by
1014                  * avoiding a call to canputnext().
1015                  */
1016                 if (rmp->rm_type == RPC_SERVER || canputnext(q)) {
1017                         putnext(q, mp);
1018                         continue;
1019                 }
1020                 (void) putbq(q, mp);
1021                 return;
1022         }
1023 }
1024 
1025 static void
1026 rpcmod_release(queue_t *q, mblk_t *bp)
1027 {
1028         struct rpcm *rmp;
1029 
1030         /*
1031          * For now, just free the message.
1032          */
1033         if (bp)
1034                 freemsg(bp);
1035         rmp = (struct rpcm *)q->q_ptr;
1036 
1037         mutex_enter(&rmp->rm_lock);
1038         rmp->rm_ref--;
1039 
1040         if (rmp->rm_ref == 0 && (rmp->rm_state & RM_CLOSING)) {
1041                 cv_broadcast(&rmp->rm_cwait);
1042         }
1043 
1044         mutex_exit(&rmp->rm_lock);
1045 }
1046 
1047 /*
1048  * This part of rpcmod is pushed on a connection-oriented transport for use
1049  * by RPC.  It serves to bypass the Stream head, implements
1050  * the record marking protocol, and dispatches incoming RPC messages.
1051  */
1052 
1053 /* Default idle timer values */
1054 #define MIR_CLNT_IDLE_TIMEOUT   (5 * (60 * 1000L))      /* 5 minutes */
1055 #define MIR_SVC_IDLE_TIMEOUT    (6 * (60 * 1000L))      /* 6 minutes */
1056 #define MIR_SVC_ORDREL_TIMEOUT  (10 * (60 * 1000L))     /* 10 minutes */
1057 #define MIR_LASTFRAG    0x80000000      /* Record marker */
1058 
1059 #define DLEN(mp) (mp->b_cont ? msgdsize(mp) : (mp->b_wptr - mp->b_rptr))
1060 
1061 #define MIR_SVC_QUIESCED(mir)   \
1062         (mir->mir_ref_cnt == 0 && mir->mir_inrservice == 0)
1063 
1064 #define MIR_CLEAR_INRSRV(mir_ptr)       {       \
1065         (mir_ptr)->mir_inrservice = 0;       \
1066         if ((mir_ptr)->mir_type == RPC_SERVER &&     \
1067                 (mir_ptr)->mir_closing)      \
1068                 cv_signal(&(mir_ptr)->mir_condvar);      \
1069 }
1070 
1071 /*
1072  * Don't block service procedure (and mir_close) if
1073  * we are in the process of closing.
1074  */
1075 #define MIR_WCANPUTNEXT(mir_ptr, write_q)       \
1076         (canputnext(write_q) || ((mir_ptr)->mir_svc_no_more_msgs == 1))
1077 
1078 static int      mir_clnt_dup_request(queue_t *q, mblk_t *mp);
1079 static void     mir_rput_proto(queue_t *q, mblk_t *mp);
1080 static int      mir_svc_policy_notify(queue_t *q, int event);
1081 static void     mir_svc_release(queue_t *wq, mblk_t *mp);
1082 static void     mir_svc_start(queue_t *wq);
1083 static void     mir_svc_idle_start(queue_t *, mir_t *);
1084 static void     mir_svc_idle_stop(queue_t *, mir_t *);
1085 static void     mir_svc_start_close(queue_t *, mir_t *);
1086 static void     mir_clnt_idle_do_stop(queue_t *);
1087 static void     mir_clnt_idle_stop(queue_t *, mir_t *);
1088 static void     mir_clnt_idle_start(queue_t *, mir_t *);
1089 static void     mir_wput(queue_t *q, mblk_t *mp);
1090 static void     mir_wput_other(queue_t *q, mblk_t *mp);
1091 static void     mir_wsrv(queue_t *q);
1092 static  void    mir_disconnect(queue_t *, mir_t *ir);
1093 static  int     mir_check_len(queue_t *, int32_t, mblk_t *);
1094 static  void    mir_timer(void *);
1095 
1096 extern void     (*mir_rele)(queue_t *, mblk_t *);
1097 extern void     (*mir_start)(queue_t *);
1098 extern void     (*clnt_stop_idle)(queue_t *);
1099 
1100 clock_t clnt_idle_timeout = MIR_CLNT_IDLE_TIMEOUT;
1101 clock_t svc_idle_timeout = MIR_SVC_IDLE_TIMEOUT;
1102 
1103 /*
1104  * Timeout for subsequent notifications of idle connection.  This is
1105  * typically used to clean up after a wedged orderly release.
1106  */
1107 clock_t svc_ordrel_timeout = MIR_SVC_ORDREL_TIMEOUT; /* milliseconds */
1108 
1109 extern  uint_t  *clnt_max_msg_sizep;
1110 extern  uint_t  *svc_max_msg_sizep;
1111 uint_t  clnt_max_msg_size = RPC_MAXDATASIZE;
1112 uint_t  svc_max_msg_size = RPC_MAXDATASIZE;
1113 uint_t  mir_krpc_cell_null;
1114 
1115 uint32_t        cb_live = 0;
1116 
1117 static void
1118 mir_callback_thread(SVCCB *svc_cb)
1119 {
1120         callb_cpr_t     cprinfo;
1121         kmutex_t        cpr_lock;
1122         SVCXPRT         *clone_xprt;
1123         mblk_t          *mp;
1124         struct rpc_msg  msg;
1125         struct svc_req  r;
1126         char            *cred_area;
1127         int             rqcred_size = 400;      /* RQCRED_SIZE */
1128         SVC_DISPATCH    *svc_nfs41_co = svc_cb->r_dispatch;
1129 
1130         mutex_init(&cpr_lock, NULL, MUTEX_DEFAULT, NULL);
1131         CALLB_CPR_INIT(&cprinfo, &cpr_lock, callb_generic_cpr,
1132             "mir_callback_thread");
1133 
1134         mutex_enter(&svc_cb->r_lock);
1135 
1136         while (!(svc_cb->r_flags & SVCCB_NFS41_CB_THREAD_EXIT)) {
1137                 mutex_enter(&cpr_lock);
1138                 CALLB_CPR_SAFE_BEGIN(&cprinfo);
1139                 mutex_exit(&cpr_lock);
1140 
1141                 cv_wait(&svc_cb->r_cbwait, &svc_cb->r_lock);
1142 
1143                 mutex_enter(&cpr_lock);
1144                 CALLB_CPR_SAFE_END(&cprinfo, &cpr_lock);
1145                 mutex_exit(&cpr_lock);
1146 
1147                 if (svc_cb->r_flags & SVCCB_NFS41_CB_THREAD_EXIT)
1148                         break;
1149 
1150                 mp = svc_cb->r_mp;
1151                 svc_cb->r_mp = NULL;
1152                 clone_xprt = svc_clone_init();
1153 
1154                 svc_init_clone_xprt(clone_xprt, svc_cb->r_q);
1155                 clone_xprt->xp_master = NULL;
1156                 clone_xprt->xp_msg_size = 2048; /* COTS_MAX_ALLOCSIZE */
1157                 cred_area = kmem_zalloc(2 * MAX_AUTH_BYTES + rqcred_size,
1158                     KM_SLEEP);
1159                 msg.rm_call.cb_cred.oa_base = cred_area;
1160                 msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
1161                 r.rq_clntcred = &(cred_area[2 * MAX_AUTH_BYTES]);
1162 
1163                 /*
1164                  * underlying transport recv routine may modify mblk data
1165                  * and make it difficult to extract label afterwards. So
1166                  * get the label from the raw mblk data now.
1167                  */
1168                 if (is_system_labeled()) {
1169                         mblk_t *lmp;
1170 
1171                         r.rq_label = kmem_alloc(sizeof (bslabel_t), KM_NOSLEEP);
1172                         if (r.rq_label == NULL) {
1173                                 freemsg(mp);
1174                                 continue;
1175                         }
1176                         if (DB_CRED(mp) != NULL)
1177                                 lmp = mp;
1178                         else {
1179                                 ASSERT(mp->b_cont != NULL);
1180                                 lmp = mp->b_cont;
1181                                 ASSERT(DB_CRED(lmp) != NULL);
1182                         }
1183                         bcopy(label2bslabel(crgetlabel(DB_CRED(lmp))),
1184                             r.rq_label, sizeof (bslabel_t));
1185                 } else {
1186                         r.rq_label = NULL;
1187                 }
1188 
1189                 /*
1190                  * Now receive the message.
1191                  */
1192                 if (SVC_RECV(clone_xprt, mp, &msg)) {
1193                         void (*dispatchroutine) (struct svc_req *, SVCXPRT *);
1194                         bool_t no_dispatch;
1195                         enum auth_stat why;
1196 
1197                         /*
1198                          * Find the registered program and call its
1199                          * dispatch routine.
1200                          */
1201                         r.rq_xprt = clone_xprt;
1202                         r.rq_prog = msg.rm_call.cb_prog;
1203                         r.rq_vers = msg.rm_call.cb_vers;
1204                         r.rq_proc = msg.rm_call.cb_proc;
1205                         r.rq_cred = msg.rm_call.cb_cred;
1206 
1207                         if ((why = sec_svc_msg(&r, &msg, &no_dispatch)) !=
1208                             AUTH_OK) {
1209                                 svcerr_auth(clone_xprt, why);
1210                                 (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1211                         } else if (no_dispatch) {
1212                                 (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1213                         } else {
1214                                 dispatchroutine = svc_nfs41_co;
1215                                 (*dispatchroutine) (&r, clone_xprt);
1216                                 (void) SVC_FREEARGS(clone_xprt, NULL, NULL);
1217                         }
1218                         if (r.rq_cred.oa_flavor == RPCSEC_GSS)
1219                                 rpc_gss_cleanup(clone_xprt);
1220                 }
1221                 if (r.rq_label != NULL)
1222                         kmem_free(r.rq_label, sizeof (bslabel_t));
1223         }
1224 
1225         mutex_exit(&svc_cb->r_lock);
1226         cv_signal(&svc_cb->r_cbexit);
1227 
1228         mutex_enter(&cpr_lock);
1229         CALLB_CPR_EXIT(&cprinfo);
1230 
1231         zthread_exit();
1232 }
1233 
1234 void
1235 mir_set_cbinfo(queue_t *wq, void *info)
1236 {
1237 
1238         CBSERVER_ARGS   *cbsrv_args = (CBSERVER_ARGS *)info;
1239         mir_t   *mir = (mir_t *)wq->q_ptr;
1240         SVCCB *scb = mir->mir_cb;
1241 
1242         if (scb != NULL) {
1243                 /* shouldn't this be an ASSERT? */




1244                 cmn_err(CE_WARN, "mir_set_cbinfo: scb != NULL");
1245                 kmem_free(scb, sizeof (SVCCB));

















1246                 mir->mir_cb = NULL;
1247         }
1248 
1249         scb = kmem_zalloc(sizeof (SVCCB), KM_SLEEP);
1250         mutex_init(&scb->r_lock, NULL, MUTEX_DEFAULT, NULL);
1251         cv_init(&scb->r_cbwait, NULL, CV_DEFAULT, NULL);
1252         cv_init(&scb->r_cbexit, NULL, CV_DEFAULT, NULL);
1253         scb->r_prog = cbsrv_args->prog;
1254         scb->r_dispatch = cbsrv_args->callback;
1255         scb->r_q = wq;
1256         mir->mir_cb = scb;
1257 
1258         scb->r_thread =
1259             zthread_create(NULL, 0, mir_callback_thread, scb, 0, minclsyspri);
1260         ASSERT(scb->r_thread != NULL);
1261 }
1262 
1263 void
1264 mir_clear_cbinfo(queue_t *wq)
1265 {
1266         mir_t   *mir = (mir_t *)wq->q_ptr;
1267         SVCCB   *scb;
1268 
1269         mutex_enter(&mir->mir_mutex);
1270         scb = mir->mir_cb;
1271         if (scb == NULL) {
1272                 mutex_exit(&mir->mir_mutex);
1273                 return;



1274         }
1275 
1276         mir->mir_cb = NULL;
1277         mutex_exit(&mir->mir_mutex);
1278 
1279         mutex_enter(&scb->r_lock);
1280         scb->r_flags |= SVCCB_NFS41_CB_THREAD_EXIT;
1281         cv_signal(&scb->r_cbwait);
1282 
1283         cv_wait(&scb->r_cbexit, &scb->r_lock);
1284         mutex_exit(&scb->r_lock);
1285         mutex_destroy(&scb->r_lock);
1286         cv_destroy(&scb->r_cbwait);
1287         cv_destroy(&scb->r_cbexit);
1288         kmem_free(scb, sizeof (SVCCB));
1289 }
1290 
1291 void
1292 mir_check_cb(void *handlecb, queue_t *wq)
1293 {
1294         ASSERT(handlecb == ((mir_t *)wq->q_ptr)->mir_cb);
1295 }
1296 
1297 
1298 SVCCB *
1299 mir_get_svccb(queue_t *wq)
1300 {
1301         mir_t   *mir;
1302         mir = (mir_t *)wq->q_ptr;
1303         return ((SVCCB *)mir->mir_cb);
1304 }
1305 
1306 static void
1307 mir_timer_stop(mir_t *mir)
1308 {
1309         timeout_id_t tid;
1310 
1311         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1312 
1313         /*
1314          * Since the mir_mutex lock needs to be released to call
1315          * untimeout(), we need to make sure that no other thread
1316          * can start/stop the timer (changing mir_timer_id) during
1317          * that time.  The mir_timer_call bit and the mir_timer_cv
1318          * condition variable are used to synchronize this.  Setting
1319          * mir_timer_call also tells mir_timer() (refer to the comments
1320          * in mir_timer()) that it does not need to do anything.
1321          */
1322         while (mir->mir_timer_call)
1323                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1324         mir->mir_timer_call = B_TRUE;
1325 
1326         if ((tid = mir->mir_timer_id) != 0) {
1327                 mir->mir_timer_id = 0;
1328                 mutex_exit(&mir->mir_mutex);
1329                 (void) untimeout(tid);
1330                 mutex_enter(&mir->mir_mutex);
1331         }
1332         mir->mir_timer_call = B_FALSE;
1333         cv_broadcast(&mir->mir_timer_cv);
1334 }
1335 
1336 static void
1337 mir_timer_start(queue_t *q, mir_t *mir, clock_t intrvl)
1338 {
1339         timeout_id_t tid;
1340 
1341         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1342 
1343         while (mir->mir_timer_call)
1344                 cv_wait(&mir->mir_timer_cv, &mir->mir_mutex);
1345         mir->mir_timer_call = B_TRUE;
1346 
1347         if ((tid = mir->mir_timer_id) != 0) {
1348                 mutex_exit(&mir->mir_mutex);
1349                 (void) untimeout(tid);
1350                 mutex_enter(&mir->mir_mutex);
1351         }
1352         /* Only start the timer when it is not closing. */
1353         if (!mir->mir_closing) {
1354                 mir->mir_timer_id = timeout(mir_timer, q,
1355                     MSEC_TO_TICK(intrvl));
1356         }
1357         mir->mir_timer_call = B_FALSE;
1358         cv_broadcast(&mir->mir_timer_cv);
1359 }
1360 
1361 static int
1362 mir_clnt_dup_request(queue_t *q, mblk_t *mp)
1363 {
1364         mblk_t  *mp1;
1365         uint32_t  new_xid;
1366         uint32_t  old_xid;
1367 
1368         ASSERT(MUTEX_HELD(&((mir_t *)q->q_ptr)->mir_mutex));
1369         new_xid = BE32_TO_U32(&mp->b_rptr[4]);
1370         /*
1371          * This loop is a bit tacky -- it walks the STREAMS list of
1372          * flow-controlled messages.
1373          */
1374         if ((mp1 = q->q_first) != NULL) {
1375                 do {
1376                         old_xid = BE32_TO_U32(&mp1->b_rptr[4]);
1377                         if (new_xid == old_xid)
1378                                 return (1);
1379                 } while ((mp1 = mp1->b_next) != NULL);
1380         }
1381         return (0);
1382 }
1383 
1384 static int
1385 mir_close(queue_t *q)
1386 {
1387         mir_t   *mir = q->q_ptr;
1388         mblk_t  *mp;
1389         bool_t queue_cleaned = FALSE;
1390 
1391         RPCLOG(32, "rpcmod: mir_close of q 0x%p\n", (void *)q);
1392         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1393         mutex_enter(&mir->mir_mutex);
1394         if ((mp = mir->mir_head_mp) != NULL) {
1395                 mir->mir_head_mp = NULL;
1396                 mir->mir_tail_mp = NULL;
1397                 freemsg(mp);
1398         }
1399         /*
1400          * Set mir_closing so we get notified when MIR_SVC_QUIESCED()
1401          * is TRUE.  And mir_timer_start() won't start the timer again.
1402          */
1403         mir->mir_closing = B_TRUE;
1404         mir_timer_stop(mir);
1405 
1406         if (mir->mir_type == RPC_SERVER) {
1407                 flushq(q, FLUSHDATA);   /* Ditch anything waiting on read q */
1408 
1409                 /*
1410                  * This will prevent more requests from arriving and
1411                  * will force rpcmod to ignore flow control.
1412                  */
1413                 mir_svc_start_close(WR(q), mir);
1414 
1415                 while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
1416 
1417                         if (mir->mir_ref_cnt && !mir->mir_inrservice &&
1418                             (queue_cleaned == FALSE)) {
1419                                 /*
1420                                  * call into SVC to clean the queue
1421                                  */
1422                                 mutex_exit(&mir->mir_mutex);
1423                                 svc_queueclean(q);
1424                                 queue_cleaned = TRUE;
1425                                 mutex_enter(&mir->mir_mutex);
1426                                 continue;
1427                         }
1428 
1429                         /*
1430                          * Bugid 1253810 - Force the write service
1431                          * procedure to send its messages, regardless
1432                          * whether the downstream  module is ready
1433                          * to accept data.
1434                          */
1435                         if (mir->mir_inwservice == 1)
1436                                 qenable(WR(q));
1437 
1438                         cv_wait(&mir->mir_condvar, &mir->mir_mutex);
1439                 }
1440 
1441                 mutex_exit(&mir->mir_mutex);
1442                 /*
1443                  * Destroy the cm_entry
1444                  */
1445                 connmgr_cb_destroy(WR(q));
1446                 qprocsoff(q);
1447 
1448                 /* Notify KRPC that this stream is going away. */
1449                 svc_queueclose(q);
1450         } else {
1451                 mutex_exit(&mir->mir_mutex);
1452                 qprocsoff(q);
1453         }
1454 
1455         mutex_destroy(&mir->mir_mutex);
1456         cv_destroy(&mir->mir_condvar);
1457         cv_destroy(&mir->mir_timer_cv);
1458         kmem_free(mir, sizeof (mir_t));
1459         return (0);
1460 }
1461 
1462 /*
1463  * This is server side only (RPC_SERVER).
1464  *
1465  * Exit idle mode.
1466  */
1467 static void
1468 mir_svc_idle_stop(queue_t *q, mir_t *mir)
1469 {
1470         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1471         ASSERT((q->q_flag & QREADR) == 0);
1472         ASSERT(mir->mir_type == RPC_SERVER);
1473         RPCLOG(16, "rpcmod: mir_svc_idle_stop of q 0x%p\n", (void *)q);
1474 
1475         mir_timer_stop(mir);
1476 }
1477 
1478 /*
1479  * This is server side only (RPC_SERVER).
1480  *
1481  * Start idle processing, which will include setting idle timer if the
1482  * stream is not being closed.
1483  */
1484 static void
1485 mir_svc_idle_start(queue_t *q, mir_t *mir)
1486 {
1487         ASSERT(MUTEX_HELD(&mir->mir_mutex));
1488         ASSERT((q->q_flag & QREADR) == 0);
1489         ASSERT(mir->mir_type == RPC_SERVER);
1490         RPCLOG(16, "rpcmod: mir_svc_idle_start q 0x%p\n", (void *)q);
1491 
1492         /*
1493          * Don't re-start idle timer if we are closing queues.
1494          */
1495         if (mir->mir_closing) {
1496                 RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
1497                     (void *)q);
1498 
1499                 /*
1500                  * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
1501                  * is true.  When it is true, and we are in the process of
1502                  * closing the stream, signal any thread waiting in
1503                  * mir_close().
1504                  */
1505                 if (mir->mir_inwservice == 0)
1506                         cv_signal(&mir->mir_condvar);
1507 
1508         } else {
1509                 RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
1510                     mir->mir_ordrel_pending ? "ordrel" : "normal");
1511                 /*
1512                  * Normal condition, start the idle timer.  If an orderly
1513                  * release has been sent, set the timeout to wait for the
1514                  * client to close its side of the connection.  Otherwise,
1515                  * use the normal idle timeout.
1516                  */
1517                 mir_timer_start(q, mir, mir->mir_ordrel_pending ?
1518                     svc_ordrel_timeout : mir->mir_idle_timeout);
1519         }
1520 }
1521 
1522 /* ARGSUSED */
1523 static int
1524 mir_open(queue_t *q, dev_t *devp, int flag, int sflag, cred_t *credp)
1525 {
1526         mir_t   *mir;
1527 
1528         RPCLOG(32, "rpcmod: mir_open of q 0x%p\n", (void *)q);
1529         /* Set variables used directly by KRPC. */
1530         if (!mir_rele)
1531                 mir_rele = mir_svc_release;
1532         if (!mir_start)
1533                 mir_start = mir_svc_start;
1534         if (!clnt_stop_idle)
1535                 clnt_stop_idle = mir_clnt_idle_do_stop;
1536         if (!clnt_max_msg_sizep)
1537                 clnt_max_msg_sizep = &clnt_max_msg_size;
1538         if (!svc_max_msg_sizep)
1539                 svc_max_msg_sizep = &svc_max_msg_size;
1540 
1541         /* Allocate a zero'ed out mir structure for this stream. */
1542         mir = kmem_zalloc(sizeof (mir_t), KM_SLEEP);
1543 
1544         /*
1545          * We set hold inbound here so that incoming messages will
1546          * be held on the read-side queue until the stream is completely
1547          * initialized with a RPC_CLIENT or RPC_SERVER ioctl.  During
1548          * the ioctl processing, the flag is cleared and any messages that
1549          * arrived between the open and the ioctl are delivered to KRPC.
1550          *
1551          * Early data should never arrive on a client stream since
1552          * servers only respond to our requests and we do not send any.
1553          * until after the stream is initialized.  Early data is
1554          * very common on a server stream where the client will start
1555          * sending data as soon as the connection is made (and this
1556          * is especially true with TCP where the protocol accepts the
1557          * connection before nfsd or KRPC is notified about it).
1558          */
1559 
1560         mir->mir_hold_inbound = 1;
1561 
1562         /*
1563          * Start the record marker looking for a 4-byte header.  When
1564          * this length is negative, it indicates that rpcmod is looking
1565          * for bytes to consume for the record marker header.  When it
1566          * is positive, it holds the number of bytes that have arrived
1567          * for the current fragment and are being held in mir_header_mp.
1568          */
1569 
1570         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1571 
1572         mir->mir_zoneid = rpc_zoneid();
1573         mutex_init(&mir->mir_mutex, NULL, MUTEX_DEFAULT, NULL);
1574         cv_init(&mir->mir_condvar, NULL, CV_DRIVER, NULL);
1575         cv_init(&mir->mir_timer_cv, NULL, CV_DRIVER, NULL);
1576 
1577         q->q_ptr = (char *)mir;
1578         WR(q)->q_ptr = (char *)mir;
1579 
1580         /*
1581          * We noenable the read-side queue because we don't want it
1582          * automatically enabled by putq.  We enable it explicitly
1583          * in mir_wsrv when appropriate. (See additional comments on
1584          * flow control at the beginning of mir_rsrv.)
1585          */
1586         noenable(q);
1587 
1588         qprocson(q);
1589         return (0);
1590 }
1591 void
1592 mir_queue_rele(queue_t *q)
1593 {
1594         mir_t   *mir;
1595 
1596         ASSERT(q != NULL);
1597         mir = (mir_t *)q->q_ptr;
1598         ASSERT(mir != NULL);
1599 
1600         mutex_enter(&mir->mir_mutex);
1601         mir->mir_ref_cnt--;
1602         mutex_exit(&mir->mir_mutex);
1603 }
1604 
1605 void
1606 mir_queue_hold(queue_t *q)
1607 {
1608         mir_t   *mir;
1609 
1610         ASSERT(q != NULL);
1611         mir = (mir_t *)q->q_ptr;
1612         ASSERT(mir != NULL);
1613 
1614         mutex_enter(&mir->mir_mutex);
1615         mir->mir_ref_cnt++;
1616         mutex_exit(&mir->mir_mutex);
1617 }
1618 
1619 /*
1620  * Copy out the RPC transaction id and RPC Direction
1621  * from the mblk chain. Leave the mblk intact.
1622  */
1623 bool_t
1624 mir_dir_xid(mblk_t *mp, uint32_t *dir, uint32_t *xid)
1625 {
1626         unsigned char   *p;
1627         unsigned char   *rptr;
1628         mblk_t          *tmp;
1629         int              i, get_rpcdir;
1630         uint32_t         d_tmp = 0;
1631 
1632         /*
1633          * If we can just grab the XID and RPC direction flag great.
1634          */
1635         if ((IS_P2ALIGNED(mp->b_rptr, (sizeof (uint64_t)))) &&
1636             (mp->b_wptr - mp->b_rptr) >= (sizeof (uint64_t))) {
1637                 *xid = *((uint32_t *)mp->b_rptr);
1638                 *dir = ntohl(*((uint32_t *)(mp->b_rptr + sizeof (uint32_t))));
1639                 return (TRUE);
1640         }
1641 
1642         /*
1643          * Otherwise we need to copy byte-by-byte
1644          */
1645         DTRACE_PROBE(krpc__i__bytecopy);
1646 
1647         i = get_rpcdir = 0;
1648         p = (unsigned char *)xid;
1649         tmp = mp;
1650 
1651         /*
1652          * While we have not exhausted the entire mblk chain:
1653          * copy the first sizeof uint32_t value into xid, and
1654          * then the second sizeof uint32_t value into a temporary
1655          * so that we can convert from network byte order.
1656          *
1657          * Should we exhaust the entire mblk chain in attempting
1658          * to do this, return FALSE.
1659          */
1660         while (tmp) {
1661                 rptr = tmp->b_rptr;
1662                 while (rptr < tmp->b_wptr) {
1663                         *p++ = *rptr++;
1664                         /*
1665                          * Have we collected enough bytes for
1666                          * a uint32_t ?
1667                          */
1668                         if (++i == sizeof (uint32_t)) {
1669                                 /*
1670                                  * If yes, do we need to switch to
1671                                  * RPC Direction or are we all done ?
1672                                  */
1673                                 if (get_rpcdir) {
1674                                         /* Got it all */
1675                                         *dir = ntohl(d_tmp);
1676                                         return (TRUE);
1677                                 }
1678                                 /* start to collect RPC Direction */
1679                                 get_rpcdir++;
1680                                 i = 0;
1681                                 p = (unsigned char *)&d_tmp;
1682                         }
1683                 }
1684                 tmp = tmp->b_cont;
1685         }
1686 
1687         /* We didn't get both of them.. */
1688         DTRACE_PROBE(krpc__e__mblk_exhausted);
1689         return (FALSE);
1690 }
1691 
1692 /*
1693  * Read-side put routine for both the client and server side.  Does the
1694  * record marking for incoming RPC messages, and when complete, dispatches
1695  * the message to either the client or server.
1696  */
1697 static void
1698 mir_rput(queue_t *q, mblk_t *mp)
1699 {
1700         int     excess;
1701         int32_t frag_len, frag_header;
1702         mblk_t  *cont_mp, *head_mp, *tail_mp, *mp1;
1703         mir_t   *mir = q->q_ptr;
1704         boolean_t stop_timer = B_FALSE;
1705         uint32_t        xid;
1706         uint32_t        dir;
1707 
1708         ASSERT(mir != NULL);
1709 
1710         /*
1711          * If the stream has not been set up as a RPC_CLIENT or RPC_SERVER
1712          * with the corresponding ioctl, then don't accept
1713          * any inbound data.  This should never happen for streams
1714          * created by nfsd or client-side KRPC because they are careful
1715          * to set the mode of the stream before doing anything else.
1716          */
1717         if (mir->mir_type == 0) {
1718                 freemsg(mp);
1719                 return;
1720         }
1721 
1722         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
1723 
1724         switch (mp->b_datap->db_type) {
1725         case M_DATA:
1726                 break;
1727         case M_PROTO:
1728         case M_PCPROTO:
1729                 if (MBLKL(mp) < sizeof (t_scalar_t)) {
1730                         RPCLOG(1, "mir_rput: runt TPI message (%d bytes)\n",
1731                             (int)MBLKL(mp));
1732                         freemsg(mp);
1733                         return;
1734                 }
1735                 if (((union T_primitives *)mp->b_rptr)->type != T_DATA_IND) {
1736                         mir_rput_proto(q, mp);
1737                         return;
1738                 }
1739 
1740                 /* Throw away the T_DATA_IND block and continue with data. */
1741                 mp1 = mp;
1742                 mp = mp->b_cont;
1743                 freeb(mp1);
1744                 break;
1745         case M_SETOPTS:
1746                 /*
1747                  * If a module on the stream is trying set the Stream head's
1748                  * high water mark, then set our hiwater to the requested
1749                  * value.  We are the "stream head" for all inbound
1750                  * data messages since messages are passed directly to KRPC.
1751                  */
1752                 if (MBLKL(mp) >= sizeof (struct stroptions)) {
1753                         struct stroptions       *stropts;
1754 
1755                         stropts = (struct stroptions *)mp->b_rptr;
1756                         if ((stropts->so_flags & SO_HIWAT) &&
1757                             !(stropts->so_flags & SO_BAND)) {
1758                                 (void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
1759                         }
1760                 }
1761                 putnext(q, mp);
1762                 return;
1763         case M_FLUSH:
1764                 RPCLOG(32, "mir_rput: ignoring M_FLUSH %x ", *mp->b_rptr);
1765                 RPCLOG(32, "on q 0x%p\n", (void *)q);
1766                 putnext(q, mp);
1767                 return;
1768         default:
1769                 putnext(q, mp);
1770                 return;
1771         }
1772 
1773         mutex_enter(&mir->mir_mutex);
1774 
1775         /*
1776          * If this connection is closing, don't accept any new messages.
1777          */
1778         if (mir->mir_svc_no_more_msgs) {
1779                 ASSERT(mir->mir_type == RPC_SERVER);
1780                 mutex_exit(&mir->mir_mutex);
1781                 freemsg(mp);
1782                 return;
1783         }
1784 
1785         /* Get local copies for quicker access. */
1786         frag_len = mir->mir_frag_len;
1787         frag_header = mir->mir_frag_header;
1788         head_mp = mir->mir_head_mp;
1789         tail_mp = mir->mir_tail_mp;
1790 
1791         /* Loop, processing each message block in the mp chain separately. */
1792         do {
1793                 cont_mp = mp->b_cont;
1794                 mp->b_cont = NULL;
1795 
1796                 /*
1797                  * Drop zero-length mblks to prevent unbounded kernel memory
1798                  * consumption.
1799                  */
1800                 if (MBLKL(mp) == 0) {
1801                         freeb(mp);
1802                         continue;
1803                 }
1804 
1805                 /*
1806                  * If frag_len is negative, we're still in the process of
1807                  * building frag_header -- try to complete it with this mblk.
1808                  */
1809                 while (frag_len < 0 && mp->b_rptr < mp->b_wptr) {
1810                         frag_len++;
1811                         frag_header <<= 8;
1812                         frag_header += *mp->b_rptr++;
1813                 }
1814 
1815                 if (MBLKL(mp) == 0 && frag_len < 0) {
1816                         /*
1817                          * We consumed this mblk while trying to complete the
1818                          * fragment header.  Free it and move on.
1819                          */
1820                         freeb(mp);
1821                         continue;
1822                 }
1823 
1824                 ASSERT(frag_len >= 0);
1825 
1826                 /*
1827                  * Now frag_header has the number of bytes in this fragment
1828                  * and we're just waiting to collect them all.  Chain our
1829                  * latest mblk onto the list and see if we now have enough
1830                  * bytes to complete the fragment.
1831                  */
1832                 if (head_mp == NULL) {
1833                         ASSERT(tail_mp == NULL);
1834                         head_mp = tail_mp = mp;
1835                 } else {
1836                         tail_mp->b_cont = mp;
1837                         tail_mp = mp;
1838                 }
1839 
1840                 frag_len += MBLKL(mp);
1841                 excess = frag_len - (frag_header & ~MIR_LASTFRAG);
1842                 if (excess < 0) {
1843                         /*
1844                          * We still haven't received enough data to complete
1845                          * the fragment, so continue on to the next mblk.
1846                          */
1847                         continue;
1848                 }
1849 
1850                 /*
1851                  * We've got a complete fragment.  If there are excess bytes,
1852                  * then they're part of the next fragment's header (of either
1853                  * this RPC message or the next RPC message).  Split that part
1854                  * into its own mblk so that we can safely freeb() it when
1855                  * building frag_header above.
1856                  */
1857                 if (excess > 0) {
1858                         if ((mp1 = dupb(mp)) == NULL &&
1859                             (mp1 = copyb(mp)) == NULL) {
1860                                 freemsg(head_mp);
1861                                 freemsg(cont_mp);
1862                                 RPCLOG0(1, "mir_rput: dupb/copyb failed\n");
1863                                 mir->mir_frag_header = 0;
1864                                 mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
1865                                 mir->mir_head_mp = NULL;
1866                                 mir->mir_tail_mp = NULL;
1867                                 mir_disconnect(q, mir); /* drops mir_mutex */
1868                                 return;
1869                         }
1870 
1871                         /*
1872                          * Relink the message chain so that the next mblk is
1873                          * the next fragment header, followed by the rest of
1874                          * the message chain.
1875                          */
1876                         mp1->b_cont = cont_mp;
1877                         cont_mp = mp1;
1878 
1879                         /*
1880                          * Data in the new mblk begins at the next fragment,
1881                          * and data in the old mblk ends at the next fragment.
1882                          */
1883                         mp1->b_rptr = mp1->b_wptr - excess;
1884                         mp->b_wptr -= excess;
1885                 }
1886 
1887                 /*
1888                  * Reset frag_len and frag_header for the next fragment.
1889                  */
1890                 frag_len = -(int32_t)sizeof (uint32_t);
1891                 if (!(frag_header & MIR_LASTFRAG)) {
1892                         /*
1893                          * The current fragment is complete, but more
1894                          * fragments need to be processed before we can
1895                          * pass along the RPC message headed at head_mp.
1896                          */
1897                         frag_header = 0;
1898                         continue;
1899                 }
1900                 frag_header = 0;
1901 
1902                 /*
1903                  * Get msg direction and handle to the appropriate ctxt
1904                  */
1905                 if (!mir_dir_xid(head_mp, &dir, &xid)) {
1906                         /* XXX - if we can't get the dir, we're hosed */
1907                         mutex_exit(&mir->mir_mutex);
1908                         freemsg(head_mp);
1909                         return;
1910                 }
1911 
1912                 /*
1913                  * We've got a complete RPC message; pass it to the
1914                  * appropriate consumer.
1915                  */
1916                 switch (mir->mir_type) {
1917                 case RPC_CLIENT:
1918                         switch (dir) {
1919                         case REPLY:
1920                                 if (clnt_dispatch_notify(head_mp,
1921                                     mir->mir_zoneid, xid)) {
1922                                         /*
1923                                          * Mark this stream as active.
1924                                          * This marker is used in mir_timer().
1925                                          */
1926                                         mir->mir_clntreq = 1;
1927                                         mir->mir_use_timestamp = lbolt;
1928                                 } else
1929                                         freemsg(head_mp);
1930                                 break;
1931 
1932                         case CALL:
1933                                 /* client is now a callback server */
1934                         default:
1935                         {
1936                                 SVCCB   *svccb;
1937                                 ASSERT(dir == CALL);
1938 
1939                                 svccb = (SVCCB *)mir->mir_cb;
1940                                 if (svccb != NULL) {
1941                                         mutex_enter(&svccb->r_lock);
1942                                         if (!(svccb->r_flags &
1943                                             SVCCB_NFS41_CB_THREAD_EXIT)) {


1944                                                 svccb->r_mp = head_mp;
1945                                                 cv_signal(&svccb->r_cbwait);
1946                                         } else {
1947                                                 freemsg(head_mp);
1948                                         }
1949                                         mutex_exit(&svccb->r_lock);
1950                                 } else {
1951                                         freemsg(head_mp);
1952                                 }
1953                                 break;
1954                         }
1955 
1956                         }
1957                         break;
1958 
1959                 case RPC_SERVER:
1960                         switch (dir) {
1961                         case REPLY:
1962                                 /*
1963                                  * RPC Server initiated a Callback RPC and
1964                                  * is receiving a reply from the RPC Client.
1965                                  */
1966                                 if (clnt_dispatch_notify(head_mp,
1967                                     global_zone->zone_id, xid)) {
1968                                         /*
1969                                          * Mark this stream as active.
1970                                          * This marker is used in mir_timer().
1971                                          */
1972                                         mir->mir_clntreq = 0;
1973                                 } else
1974                                         freemsg(head_mp);
1975                                 break;
1976 
1977                         case CALL:
1978                         default:
1979                                 /*
1980                                  * Check for flow control before
1981                                  * passing the message to KRPC.
1982                                  */
1983                                 if (!mir->mir_hold_inbound) {
1984                                         if (mir->mir_krpc_cell) {
1985                                         /*
1986                                          * If the reference count is 0
1987                                          * (not including this request),
1988                                          * then the stream is transitioning
1989                                          * from idle to non-idle. In this case,
1990                                          * we cancel the idle timer.
1991                                          */
1992                                         if (mir->mir_ref_cnt++ == 0)
1993                                                 stop_timer = B_TRUE;
1994                                         if (mir_check_len(q,
1995                                             (int32_t)msgdsize(mp), mp))
1996                                                 return;
1997                                         svc_queuereq(q, head_mp); /* to KRPC */
1998                                         } else {
1999                                         /*
2000                                          * Count # of times this happens.
2001                                          * Should be never, but experience
2002                                          * shows otherwise.
2003                                          */
2004                                         mir_krpc_cell_null++;
2005                                         freemsg(head_mp);
2006                                         }
2007 
2008                                 } else {
2009                                         /*
2010                                          * If the outbound side of the stream
2011                                          * is flow controlled, then hold this
2012                                          * message until client catches up.
2013                                          * mir_hold_inbound is set in mir_wput
2014                                          * and cleared in mir_wsrv.
2015                                          */
2016                                         (void) putq(q, head_mp);
2017                                         mir->mir_inrservice = B_TRUE;
2018                                 }
2019                                 break;
2020                         }
2021                         break; /* RPC_SERVER */
2022 
2023                 default:
2024                         RPCLOG(1, "mir_rput: unknown mir_type %d\n",
2025                             mir->mir_type);
2026                         freemsg(head_mp);
2027                         break;
2028                 }
2029 
2030                 /*
2031                  * Reset the chain since we're starting on a new RPC message.
2032                  */
2033                 head_mp = tail_mp = NULL;
2034         } while ((mp = cont_mp) != NULL);
2035 
2036         /*
2037          * Sanity check the message length; if it's too large mir_check_len()
2038          * will shutdown the connection, drop mir_mutex, and return non-zero.
2039          */
2040         if (head_mp != NULL && mir->mir_setup_complete &&
2041             mir_check_len(q, frag_len, head_mp))
2042                 return;
2043 
2044         /* Save our local copies back in the mir structure. */
2045         mir->mir_frag_header = frag_header;
2046         mir->mir_frag_len = frag_len;
2047         mir->mir_head_mp = head_mp;
2048         mir->mir_tail_mp = tail_mp;
2049 
2050         /*
2051          * The timer is stopped after the whole message chain is processed.
2052          * The reason is that stopping the timer releases the mir_mutex
2053          * lock temporarily.  This means that the request can be serviced
2054          * while we are still processing the message chain.  This is not
2055          * good.  So we stop the timer here instead.
2056          *
2057          * Note that if the timer fires before we stop it, it will not
2058          * do any harm as MIR_SVC_QUIESCED() is false and mir_timer()
2059          * will just return.
2060          */
2061         if (stop_timer) {
2062                 RPCLOG(16, "mir_rput: stopping idle timer on 0x%p because "
2063                     "ref cnt going to non zero\n", (void *)WR(q));
2064                 mir_svc_idle_stop(WR(q), mir);
2065         }
2066         mutex_exit(&mir->mir_mutex);
2067 }
2068 
2069 static void
2070 mir_rput_proto(queue_t *q, mblk_t *mp)
2071 {
2072         mir_t   *mir = (mir_t *)q->q_ptr;
2073         uint32_t        type;
2074         uint32_t reason = 0;
2075 
2076         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2077 
2078         type = ((union T_primitives *)mp->b_rptr)->type;
2079         switch (mir->mir_type) {
2080         case RPC_CLIENT:
2081                 switch (type) {
2082                 case T_DISCON_IND:
2083                         reason = ((struct T_discon_ind *)
2084                             (mp->b_rptr))->DISCON_reason;
2085                         /*FALLTHROUGH*/
2086                 case T_ORDREL_IND:
2087                         mutex_enter(&mir->mir_mutex);
2088                         if (mir->mir_head_mp) {
2089                                 freemsg(mir->mir_head_mp);
2090                                 mir->mir_head_mp = (mblk_t *)0;
2091                                 mir->mir_tail_mp = (mblk_t *)0;
2092                         }
2093                         /*
2094                          * We are disconnecting, but not necessarily
2095                          * closing. By not closing, we will fail to
2096                          * pick up a possibly changed global timeout value,
2097                          * unless we store it now.
2098                          */
2099                         mir->mir_idle_timeout = clnt_idle_timeout;
2100                         mir_clnt_idle_stop(WR(q), mir);
2101 
2102                         /*
2103                          * Even though we are unconnected, we still
2104                          * leave the idle timer going on the client. The
2105                          * reason for is that if we've disconnected due
2106                          * to a server-side disconnect, reset, or connection
2107                          * timeout, there is a possibility the client may
2108                          * retry the RPC request. This retry needs to done on
2109                          * the same bound address for the server to interpret
2110                          * it as such. However, we don't want
2111                          * to wait forever for that possibility. If the
2112                          * end-point stays unconnected for mir_idle_timeout
2113                          * units of time, then that is a signal to the
2114                          * connection manager to give up waiting for the
2115                          * application (eg. NFS) to send a retry.
2116                          */
2117                         mir_clnt_idle_start(WR(q), mir);
2118                         mutex_exit(&mir->mir_mutex);
2119                         clnt_dispatch_notifyall(WR(q), type, reason);
2120                         freemsg(mp);
2121                         return;
2122                 case T_ERROR_ACK:
2123                 {
2124                         struct T_error_ack      *terror;
2125 
2126                         terror = (struct T_error_ack *)mp->b_rptr;
2127                         RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
2128                             (void *)q);
2129                         RPCLOG(1, " ERROR_prim: %s,",
2130                             rpc_tpiprim2name(terror->ERROR_prim));
2131                         RPCLOG(1, " TLI_error: %s,",
2132                             rpc_tpierr2name(terror->TLI_error));
2133                         RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
2134                         if (terror->ERROR_prim == T_DISCON_REQ)  {
2135                                 clnt_dispatch_notifyall(WR(q), type, reason);
2136                                 freemsg(mp);
2137                                 return;
2138                         } else {
2139                                 if (clnt_dispatch_notifyconn(WR(q), mp))
2140                                         return;
2141                         }
2142                         break;
2143                 }
2144                 case T_OK_ACK:
2145                 {
2146                         struct T_ok_ack *tok = (struct T_ok_ack *)mp->b_rptr;
2147 
2148                         if (tok->CORRECT_prim == T_DISCON_REQ) {
2149                                 clnt_dispatch_notifyall(WR(q), type, reason);
2150                                 freemsg(mp);
2151                                 return;
2152                         } else {
2153                                 if (clnt_dispatch_notifyconn(WR(q), mp))
2154                                         return;
2155                         }
2156                         break;
2157                 }
2158                 case T_CONN_CON:
2159                 case T_INFO_ACK:
2160                 case T_OPTMGMT_ACK:
2161                         if (clnt_dispatch_notifyconn(WR(q), mp))
2162                                 return;
2163                         break;
2164                 case T_BIND_ACK:
2165                         break;
2166                 default:
2167                         RPCLOG(1, "mir_rput: unexpected message %d "
2168                             "for KRPC client\n",
2169                             ((union T_primitives *)mp->b_rptr)->type);
2170                         break;
2171                 }
2172                 break;
2173 
2174         case RPC_SERVER:
2175                 switch (type) {
2176                 case T_BIND_ACK:
2177                 {
2178                         struct T_bind_ack       *tbind;
2179 
2180                         /*
2181                          * If this is a listening stream, then shut
2182                          * off the idle timer.
2183                          */
2184                         tbind = (struct T_bind_ack *)mp->b_rptr;
2185                         if (tbind->CONIND_number > 0) {
2186                                 mutex_enter(&mir->mir_mutex);
2187                                 mir_svc_idle_stop(WR(q), mir);
2188 
2189                                 /*
2190                                  * mark this as a listen endpoint
2191                                  * for special handling.
2192                                  */
2193 
2194                                 mir->mir_listen_stream = 1;
2195                                 mutex_exit(&mir->mir_mutex);
2196                         }
2197                         break;
2198                 }
2199                 case T_DISCON_IND:
2200                 case T_ORDREL_IND:
2201                         RPCLOG(16, "mir_rput_proto: got %s indication\n",
2202                             type == T_DISCON_IND ? "disconnect"
2203                             : "orderly release");
2204 
2205                         /*
2206                          * For listen endpoint just pass
2207                          * on the message.
2208                          */
2209 
2210                         if (mir->mir_listen_stream)
2211                                 break;
2212 
2213 
2214                         mutex_enter(&mir->mir_mutex);
2215 
2216                         /*
2217                          * If client wants to break off connection, record
2218                          * that fact.
2219                          */
2220                         mir_svc_start_close(WR(q), mir);
2221 
2222                         /*
2223                          * If we are idle, then send the orderly release
2224                          * or disconnect indication to nfsd.
2225                          */
2226                         if (MIR_SVC_QUIESCED(mir)) {
2227                                 mutex_exit(&mir->mir_mutex);
2228                                 break;
2229                         }
2230 
2231                         RPCLOG(16, "mir_rput_proto: not idle, so "
2232                             "disconnect/ord rel indication not passed "
2233                             "upstream on 0x%p\n", (void *)q);
2234 
2235                         /*
2236                          * Hold the indication until we get idle
2237                          * If there already is an indication stored,
2238                          * replace it if the new one is a disconnect. The
2239                          * reasoning is that disconnection takes less time
2240                          * to process, and once a client decides to
2241                          * disconnect, we should do that.
2242                          */
2243                         if (mir->mir_svc_pend_mp) {
2244                                 if (type == T_DISCON_IND) {
2245                                         RPCLOG(16, "mir_rput_proto: replacing"
2246                                             " held disconnect/ord rel"
2247                                             " indication with disconnect on"
2248                                             " 0x%p\n", (void *)q);
2249 
2250                                         freemsg(mir->mir_svc_pend_mp);
2251                                         mir->mir_svc_pend_mp = mp;
2252                                 } else {
2253                                         RPCLOG(16, "mir_rput_proto: already "
2254                                             "held a disconnect/ord rel "
2255                                             "indication. freeing ord rel "
2256                                             "ind on 0x%p\n", (void *)q);
2257                                         freemsg(mp);
2258                                 }
2259                         } else
2260                                 mir->mir_svc_pend_mp = mp;
2261 
2262                         mutex_exit(&mir->mir_mutex);
2263                         return;
2264 
2265                 default:
2266                         /* nfsd handles server-side non-data messages. */
2267                         break;
2268                 }
2269                 break;
2270 
2271         default:
2272                 break;
2273         }
2274 
2275         putnext(q, mp);
2276 }
2277 
2278 /*
2279  * The server-side read queues are used to hold inbound messages while
2280  * outbound flow control is exerted.  When outbound flow control is
2281  * relieved, mir_wsrv qenables the read-side queue.  Read-side queues
2282  * are not enabled by STREAMS and are explicitly noenable'ed in mir_open.
2283  *
2284  * For the server side,  we have two types of messages queued. The first type
2285  * are messages that are ready to be XDR decoded and and then sent to the
2286  * RPC program's dispatch routine. The second type are "raw" messages that
2287  * haven't been processed, i.e. assembled from rpc record fragements into
2288  * full requests. The only time we will see the second type of message
2289  * queued is if we have a memory allocation failure while processing a
2290  * a raw message. The field mir_first_non_processed_mblk will mark the
2291  * first such raw message. So the flow for server side is:
2292  *
2293  *      - send processed queued messages to kRPC until we run out or find
2294  *        one that needs additional processing because we were short on memory
2295  *        earlier
2296  *      - process a message that was deferred because of lack of
2297  *        memory
2298  *      - continue processing messages until the queue empties or we
2299  *        have to stop because of lack of memory
2300  *      - during each of the above phase, if the queue is empty and
2301  *        there are no pending messages that were passed to the RPC
2302  *        layer, send upstream the pending disconnect/ordrel indication if
2303  *        there is one
2304  *
2305  * The read-side queue is also enabled by a bufcall callback if dupmsg
2306  * fails in mir_rput.
2307  */
2308 static void
2309 mir_rsrv(queue_t *q)
2310 {
2311         mir_t   *mir;
2312         mblk_t  *mp;
2313         mblk_t  *cmp = NULL;
2314         boolean_t stop_timer = B_FALSE;
2315 
2316         mir = (mir_t *)q->q_ptr;
2317         mutex_enter(&mir->mir_mutex);
2318 
2319         mp = NULL;
2320         switch (mir->mir_type) {
2321         case RPC_SERVER:
2322                 if (mir->mir_ref_cnt == 0)
2323                         mir->mir_hold_inbound = 0;
2324                 if (mir->mir_hold_inbound) {
2325 
2326                         ASSERT(cmp == NULL);
2327                         if (q->q_first == NULL) {
2328 
2329                                 MIR_CLEAR_INRSRV(mir);
2330 
2331                                 if (MIR_SVC_QUIESCED(mir)) {
2332                                         cmp = mir->mir_svc_pend_mp;
2333                                         mir->mir_svc_pend_mp = NULL;
2334                                 }
2335                         }
2336 
2337                         mutex_exit(&mir->mir_mutex);
2338 
2339                         if (cmp != NULL) {
2340                                 RPCLOG(16, "mir_rsrv: line %d: sending a held "
2341                                     "disconnect/ord rel indication upstream\n",
2342                                     __LINE__);
2343                                 putnext(q, cmp);
2344                         }
2345 
2346                         return;
2347                 }
2348                 while (mp = getq(q)) {
2349                         if (mir->mir_krpc_cell &&
2350                             (mir->mir_svc_no_more_msgs == 0)) {
2351                                 /*
2352                                  * If we were idle, turn off idle timer since
2353                                  * we aren't idle any more.
2354                                  */
2355                                 if (mir->mir_ref_cnt++ == 0)
2356                                         stop_timer = B_TRUE;
2357                                 if (mir_check_len(q,
2358                                     (int32_t)msgdsize(mp), mp))
2359                                         return;
2360                                 svc_queuereq(q, mp);
2361                         } else {
2362                                 /*
2363                                  * Count # of times this happens. Should be
2364                                  * never, but experience shows otherwise.
2365                                  */
2366                                 if (mir->mir_krpc_cell == NULL)
2367                                         mir_krpc_cell_null++;
2368                                 freemsg(mp);
2369                         }
2370                 }
2371                 break;
2372         case RPC_CLIENT:
2373                 break;
2374         default:
2375                 RPCLOG(1, "mir_rsrv: unexpected mir_type %d\n", mir->mir_type);
2376 
2377                 if (q->q_first == NULL)
2378                         MIR_CLEAR_INRSRV(mir);
2379 
2380                 mutex_exit(&mir->mir_mutex);
2381 
2382                 return;
2383         }
2384 
2385         /*
2386          * The timer is stopped after all the messages are processed.
2387          * The reason is that stopping the timer releases the mir_mutex
2388          * lock temporarily.  This means that the request can be serviced
2389          * while we are still processing the message queue.  This is not
2390          * good.  So we stop the timer here instead.
2391          */
2392         if (stop_timer)  {
2393                 RPCLOG(16, "mir_rsrv stopping idle timer on 0x%p because ref "
2394                     "cnt going to non zero\n", (void *)WR(q));
2395                 mir_svc_idle_stop(WR(q), mir);
2396         }
2397 
2398         if (q->q_first == NULL) {
2399 
2400                 MIR_CLEAR_INRSRV(mir);
2401 
2402                 ASSERT(cmp == NULL);
2403                 if (mir->mir_type == RPC_SERVER && MIR_SVC_QUIESCED(mir)) {
2404                         cmp = mir->mir_svc_pend_mp;
2405                         mir->mir_svc_pend_mp = NULL;
2406                 }
2407 
2408                 mutex_exit(&mir->mir_mutex);
2409 
2410                 if (cmp != NULL) {
2411                         RPCLOG(16, "mir_rsrv: line %d: sending a held "
2412                             "disconnect/ord rel indication upstream\n",
2413                             __LINE__);
2414                         putnext(q, cmp);
2415                 }
2416 
2417                 return;
2418         }
2419         mutex_exit(&mir->mir_mutex);
2420 }
2421 
2422 static int mir_svc_policy_fails;
2423 
2424 /*
2425  * Called to send an event code to nfsd/lockd so that it initiates
2426  * connection close.
2427  */
2428 static int
2429 mir_svc_policy_notify(queue_t *q, int event)
2430 {
2431         mblk_t  *mp;
2432 #ifdef DEBUG
2433         mir_t *mir = (mir_t *)q->q_ptr;
2434         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2435 #endif
2436         ASSERT(q->q_flag & QREADR);
2437 
2438         /*
2439          * Create an M_DATA message with the event code and pass it to the
2440          * Stream head (nfsd or whoever created the stream will consume it).
2441          */
2442         mp = allocb(sizeof (int), BPRI_HI);
2443 
2444         if (!mp) {
2445 
2446                 mir_svc_policy_fails++;
2447                 RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
2448                     "%d\n", event);
2449                 return (ENOMEM);
2450         }
2451 
2452         U32_TO_BE32(event, mp->b_rptr);
2453         mp->b_wptr = mp->b_rptr + sizeof (int);
2454         putnext(q, mp);
2455         return (0);
2456 }
2457 
2458 /*
2459  * Server side: start the close phase. We want to get this rpcmod slot in an
2460  * idle state before mir_close() is called.
2461  */
2462 static void
2463 mir_svc_start_close(queue_t *wq, mir_t *mir)
2464 {
2465         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2466         ASSERT((wq->q_flag & QREADR) == 0);
2467         ASSERT(mir->mir_type == RPC_SERVER);
2468 
2469 
2470         /*
2471          * Do not accept any more messages.
2472          */
2473         mir->mir_svc_no_more_msgs = 1;
2474 
2475         /*
2476          * Next two statements will make the read service procedure invoke
2477          * svc_queuereq() on everything stuck in the streams read queue.
2478          * It's not necessary because enabling the write queue will
2479          * have the same effect, but why not speed the process along?
2480          */
2481         mir->mir_hold_inbound = 0;
2482         qenable(RD(wq));
2483 
2484         /*
2485          * Meanwhile force the write service procedure to send the
2486          * responses downstream, regardless of flow control.
2487          */
2488         qenable(wq);
2489 }
2490 
2491 /*
2492  * This routine is called directly by KRPC after a request is completed,
2493  * whether a reply was sent or the request was dropped.
2494  */
2495 static void
2496 mir_svc_release(queue_t *wq, mblk_t *mp)
2497 {
2498         mir_t   *mir = (mir_t *)wq->q_ptr;
2499         mblk_t  *cmp = NULL;
2500 
2501         ASSERT((wq->q_flag & QREADR) == 0);
2502         if (mp)
2503                 freemsg(mp);
2504 
2505         mutex_enter(&mir->mir_mutex);
2506 
2507         /*
2508          * Start idle processing if this is the last reference.
2509          */
2510         if ((mir->mir_ref_cnt == 1) && (mir->mir_inrservice == 0)) {
2511                 cmp = mir->mir_svc_pend_mp;
2512                 mir->mir_svc_pend_mp = NULL;
2513         }
2514 
2515         if (cmp) {
2516                 RPCLOG(16, "mir_svc_release: sending a held "
2517                     "disconnect/ord rel indication upstream on queue 0x%p\n",
2518                     (void *)RD(wq));
2519 
2520                 mutex_exit(&mir->mir_mutex);
2521 
2522                 putnext(RD(wq), cmp);
2523 
2524                 mutex_enter(&mir->mir_mutex);
2525         }
2526 
2527         /*
2528          * Start idle processing if this is the last reference.
2529          */
2530         if (mir->mir_ref_cnt == 1 && mir->mir_inrservice == 0) {
2531 
2532                 RPCLOG(16, "mir_svc_release starting idle timer on 0x%p "
2533                     "because ref cnt is zero\n", (void *) wq);
2534 
2535                 mir_svc_idle_start(wq, mir);
2536         }
2537 
2538         mir->mir_ref_cnt--;
2539         ASSERT(mir->mir_ref_cnt >= 0);
2540 
2541         /*
2542          * Wake up the thread waiting to close.
2543          */
2544 
2545         if ((mir->mir_ref_cnt == 0) && mir->mir_closing)
2546                 cv_signal(&mir->mir_condvar);
2547 
2548         mutex_exit(&mir->mir_mutex);
2549 }
2550 
2551 /*
2552  * This routine is called by server-side KRPC when it is ready to
2553  * handle inbound messages on the stream.
2554  */
2555 static void
2556 mir_svc_start(queue_t *wq)
2557 {
2558         mir_t   *mir = (mir_t *)wq->q_ptr;
2559 
2560         /*
2561          * no longer need to take the mir_mutex because the
2562          * mir_setup_complete field has been moved out of
2563          * the binary field protected by the mir_mutex.
2564          */
2565 
2566         mir->mir_setup_complete = 1;
2567         qenable(RD(wq));
2568 }
2569 
2570 /*
2571  * client side wrapper for stopping timer with normal idle timeout.
2572  */
2573 static void
2574 mir_clnt_idle_stop(queue_t *wq, mir_t *mir)
2575 {
2576         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2577         ASSERT((wq->q_flag & QREADR) == 0);
2578         ASSERT(mir->mir_type == RPC_CLIENT);
2579 
2580         mir_timer_stop(mir);
2581 }
2582 
2583 /*
2584  * client side wrapper for stopping timer with normal idle timeout.
2585  */
2586 static void
2587 mir_clnt_idle_start(queue_t *wq, mir_t *mir)
2588 {
2589         ASSERT(MUTEX_HELD(&mir->mir_mutex));
2590         ASSERT((wq->q_flag & QREADR) == 0);
2591         ASSERT(mir->mir_type == RPC_CLIENT);
2592 
2593         mir_timer_start(wq, mir, mir->mir_idle_timeout);
2594 }
2595 
2596 /*
2597  * client side only. Forces rpcmod to stop sending T_ORDREL_REQs on
2598  * end-points that aren't connected.
2599  */
2600 static void
2601 mir_clnt_idle_do_stop(queue_t *wq)
2602 {
2603         mir_t   *mir = (mir_t *)wq->q_ptr;
2604 
2605         RPCLOG(1, "mir_clnt_idle_do_stop: wq 0x%p\n", (void *)wq);
2606         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2607         mutex_enter(&mir->mir_mutex);
2608         mir_clnt_idle_stop(wq, mir);
2609         mutex_exit(&mir->mir_mutex);
2610 }
2611 
2612 /*
2613  * Timer handler.  It handles idle timeout and memory shortage problem.
2614  */
2615 static void
2616 mir_timer(void *arg)
2617 {
2618         queue_t *wq = (queue_t *)arg;
2619         mir_t *mir = (mir_t *)wq->q_ptr;
2620         boolean_t notify;
2621 
2622         mutex_enter(&mir->mir_mutex);
2623 
2624         /*
2625          * mir_timer_call is set only when either mir_timer_[start|stop]
2626          * is progressing.  And mir_timer() can only be run while they
2627          * are progressing if the timer is being stopped.  So just
2628          * return.
2629          */
2630         if (mir->mir_timer_call) {
2631                 mutex_exit(&mir->mir_mutex);
2632                 return;
2633         }
2634         mir->mir_timer_id = 0;
2635 
2636         switch (mir->mir_type) {
2637         case RPC_CLIENT:
2638 
2639                 /*
2640                  * For clients, the timer fires at clnt_idle_timeout
2641                  * intervals.  If the activity marker (mir_clntreq) is
2642                  * zero, then the stream has been idle since the last
2643                  * timer event and we notify KRPC.  If mir_clntreq is
2644                  * non-zero, then the stream is active and we just
2645                  * restart the timer for another interval.  mir_clntreq
2646                  * is set to 1 in mir_wput for every request passed
2647                  * downstream.
2648                  *
2649                  * If this was a memory shortage timer reset the idle
2650                  * timeout regardless; the mir_clntreq will not be a
2651                  * valid indicator.
2652                  *
2653                  * The timer is initially started in mir_wput during
2654                  * RPC_CLIENT ioctl processing.
2655                  *
2656                  * The timer interval can be changed for individual
2657                  * streams with the ND variable "mir_idle_timeout".
2658                  */
2659                 if (mir->mir_clntreq > 0 && mir->mir_use_timestamp +
2660                     MSEC_TO_TICK(mir->mir_idle_timeout) - lbolt >= 0) {
2661                         clock_t tout;
2662 
2663                         tout = mir->mir_idle_timeout -
2664                             TICK_TO_MSEC(lbolt - mir->mir_use_timestamp);
2665                         if (tout < 0)
2666                                 tout = 1000;
2667 #if 0
2668                         printf("mir_timer[%d < %d + %d]: reset client timer "
2669                             "to %d (ms)\n", TICK_TO_MSEC(lbolt),
2670                             TICK_TO_MSEC(mir->mir_use_timestamp),
2671                             mir->mir_idle_timeout, tout);
2672 #endif
2673                         mir->mir_clntreq = 0;
2674                         mir_timer_start(wq, mir, tout);
2675                         mutex_exit(&mir->mir_mutex);
2676                         return;
2677                 }
2678 #if 0
2679 printf("mir_timer[%d]: doing client timeout\n", lbolt / hz);
2680 #endif
2681                 /*
2682                  * We are disconnecting, but not necessarily
2683                  * closing. By not closing, we will fail to
2684                  * pick up a possibly changed global timeout value,
2685                  * unless we store it now.
2686                  */
2687                 mir->mir_idle_timeout = clnt_idle_timeout;
2688                 mir_clnt_idle_start(wq, mir);
2689 
2690                 mutex_exit(&mir->mir_mutex);
2691                 /*
2692                  * We pass T_ORDREL_REQ as an integer value
2693                  * to KRPC as the indication that the stream
2694                  * is idle.  This is not a T_ORDREL_REQ message,
2695                  * it is just a convenient value since we call
2696                  * the same KRPC routine for T_ORDREL_INDs and
2697                  * T_DISCON_INDs.
2698                  */
2699                 clnt_dispatch_notifyall(wq, T_ORDREL_REQ, 0);
2700                 return;
2701 
2702         case RPC_SERVER:
2703 
2704                 /*
2705                  * For servers, the timer is only running when the stream
2706                  * is really idle or memory is short.  The timer is started
2707                  * by mir_wput when mir_type is set to RPC_SERVER and
2708                  * by mir_svc_idle_start whenever the stream goes idle
2709                  * (mir_ref_cnt == 0).  The timer is cancelled in
2710                  * mir_rput whenever a new inbound request is passed to KRPC
2711                  * and the stream was previously idle.
2712                  *
2713                  * The timer interval can be changed for individual
2714                  * streams with the ND variable "mir_idle_timeout".
2715                  *
2716                  * If the stream is not idle do nothing.
2717                  */
2718                 if (!MIR_SVC_QUIESCED(mir)) {
2719                         mutex_exit(&mir->mir_mutex);
2720                         return;
2721                 }
2722 
2723                 notify = !mir->mir_inrservice;
2724                 mutex_exit(&mir->mir_mutex);
2725 
2726                 /*
2727                  * If there is no packet queued up in read queue, the stream
2728                  * is really idle so notify nfsd to close it.
2729                  */
2730                 if (notify) {
2731                         RPCLOG(16, "mir_timer: telling stream head listener "
2732                             "to close stream (0x%p)\n", (void *) RD(wq));
2733                         (void) mir_svc_policy_notify(RD(wq), 1);
2734                 }
2735                 return;
2736         default:
2737                 RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
2738                     mir->mir_type);
2739                 mutex_exit(&mir->mir_mutex);
2740                 return;
2741         }
2742 }
2743 
2744 /*
2745  * Called by the RPC package to send either a call or a return, or a
2746  * transport connection request.  Adds the record marking header.
2747  */
2748 static void
2749 mir_wput(queue_t *q, mblk_t *mp)
2750 {
2751         uint_t           frag_header;
2752         mir_t           *mir = (mir_t *)q->q_ptr;
2753         uchar_t         *rptr = mp->b_rptr;
2754         uint32_t         xid;
2755         uint32_t         dir;
2756 
2757         if (!mir) {
2758                 freemsg(mp);
2759                 return;
2760         }
2761 
2762         if (mp->b_datap->db_type != M_DATA) {
2763                 mir_wput_other(q, mp);
2764                 return;
2765         }
2766 
2767         if (mir->mir_ordrel_pending == 1) {
2768                 freemsg(mp);
2769                 RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
2770                     (void *)q);
2771                 return;
2772         }
2773 
2774         frag_header = (uint_t)DLEN(mp);
2775         frag_header |= MIR_LASTFRAG;
2776 
2777         /* Stick in the 4 byte record marking header. */
2778         if ((rptr - mp->b_datap->db_base) < sizeof (uint32_t) ||
2779             !IS_P2ALIGNED(mp->b_rptr, sizeof (uint32_t))) {
2780                 /*
2781                  * Since we know that M_DATA messages are created exclusively
2782                  * by KRPC, we expect that KRPC will leave room for our header
2783                  * and 4 byte align which is normal for XDR.
2784                  * If KRPC (or someone else) does not cooperate, then we
2785                  * just throw away the message.
2786                  */
2787                 RPCLOG(1, "mir_wput: KRPC did not leave space for record "
2788                     "fragment header (%d bytes left)\n",
2789                     (int)(rptr - mp->b_datap->db_base));
2790                 freemsg(mp);
2791                 return;
2792         }
2793         rptr -= sizeof (uint32_t);
2794         *(uint32_t *)rptr = htonl(frag_header);
2795         mp->b_rptr = rptr;
2796 
2797         mutex_enter(&mir->mir_mutex);
2798         if (mir->mir_type == RPC_CLIENT) {
2799                 /*
2800                  * For the client, set mir_clntreq to indicate that the
2801                  * connection is active.
2802                  */
2803                 mir->mir_clntreq = 1;
2804                 mir->mir_use_timestamp = lbolt;
2805         }
2806 
2807         /*
2808          * If we haven't already queued some data and the downstream module
2809          * can accept more data, send it on, otherwise we queue the message
2810          * and take other actions depending on mir_type.
2811          */
2812         if (!mir->mir_inwservice && MIR_WCANPUTNEXT(mir, q)) {
2813                 mutex_exit(&mir->mir_mutex);
2814 
2815                 /*
2816                  * Now we pass the RPC message downstream.
2817                  */
2818                 putnext(q, mp);
2819                 return;
2820         }
2821 
2822         /*
2823          * Get msg direction and handle to the appropriate ctxt
2824          */
2825         if (!mir_dir_xid(mp, &dir, &xid)) {
2826                 /* XXX - if we can't get the dir, we're hosed */
2827                 mutex_exit(&mir->mir_mutex);
2828                 freemsg(mp);
2829                 return;
2830         }
2831 
2832         switch (mir->mir_type) {
2833         case RPC_CLIENT:
2834                 /*
2835                  * Check for a previous duplicate request on the
2836                  * queue.  If there is one, then we throw away
2837                  * the current message and let the previous one
2838                  * go through.  If we can't find a duplicate, then
2839                  * send this one.  This tap dance is an effort
2840                  * to reduce traffic and processing requirements
2841                  * under load conditions.
2842                  */
2843                 if (mir_clnt_dup_request(q, mp)) {
2844                         mutex_exit(&mir->mir_mutex);
2845                         freemsg(mp);
2846                         return;
2847                 }
2848                 break;
2849 
2850         case RPC_SERVER:
2851                 switch (dir) {
2852                 case CALL:
2853                         /*
2854                          * RPC Server doing Callball RPC
2855                          */
2856                         if (mir_clnt_dup_request(q, mp)) {
2857                                 mutex_exit(&mir->mir_mutex);
2858                                 freemsg(mp);
2859                                 return;
2860                         }
2861                         break;
2862 
2863                 case REPLY:
2864                 default:
2865                         /*
2866                          * Set mir_hold_inbound so that new inbound RPC
2867                          * messages will be held until the client catches
2868                          * up on the earlier replies.  This flag is cleared
2869                          * in mir_wsrv after flow control is relieved;
2870                          * the read-side queue is also enabled at that time.
2871                          */
2872                         mir->mir_hold_inbound = 1;
2873                         break;
2874                 }
2875                 break;
2876         default:
2877                 RPCLOG(1, "mir_wput: unexpected mir_type %d\n", mir->mir_type);
2878                 break;
2879         }
2880         mir->mir_inwservice = 1;
2881         (void) putq(q, mp);
2882         mutex_exit(&mir->mir_mutex);
2883 }
2884 
2885 static void
2886 mir_wput_other(queue_t *q, mblk_t *mp)
2887 {
2888         mir_t   *mir = (mir_t *)q->q_ptr;
2889         struct iocblk   *iocp;
2890         uchar_t *rptr = mp->b_rptr;
2891         bool_t  flush_in_svc = FALSE;
2892 
2893         ASSERT(MUTEX_NOT_HELD(&mir->mir_mutex));
2894         switch (mp->b_datap->db_type) {
2895         case M_IOCTL:
2896                 iocp = (struct iocblk *)rptr;
2897                 switch (iocp->ioc_cmd) {
2898                 case RPC_CLIENT:
2899                         mutex_enter(&mir->mir_mutex);
2900                         if (mir->mir_type != 0 &&
2901                             mir->mir_type != iocp->ioc_cmd) {
2902 ioc_eperm:
2903                                 mutex_exit(&mir->mir_mutex);
2904                                 iocp->ioc_error = EPERM;
2905                                 iocp->ioc_count = 0;
2906                                 mp->b_datap->db_type = M_IOCACK;
2907                                 qreply(q, mp);
2908                                 return;
2909                         }
2910 
2911                         mir->mir_type = iocp->ioc_cmd;
2912 
2913                         /*
2914                          * Clear mir_hold_inbound which was set to 1 by
2915                          * mir_open.  This flag is not used on client
2916                          * streams.
2917                          */
2918                         mir->mir_hold_inbound = 0;
2919                         mir->mir_max_msg_sizep = &clnt_max_msg_size;
2920 
2921                         /*
2922                          * Start the idle timer.  See mir_timer() for more
2923                          * information on how client timers work.
2924                          */
2925                         mir->mir_idle_timeout = clnt_idle_timeout;
2926                         mir_clnt_idle_start(q, mir);
2927                         mutex_exit(&mir->mir_mutex);
2928 
2929                         mp->b_datap->db_type = M_IOCACK;
2930                         qreply(q, mp);
2931                         return;
2932                 case RPC_SERVER:
2933                         mutex_enter(&mir->mir_mutex);
2934                         if (mir->mir_type != 0 &&
2935                             mir->mir_type != iocp->ioc_cmd)
2936                                 goto ioc_eperm;
2937 
2938                         /*
2939                          * We don't clear mir_hold_inbound here because
2940                          * mir_hold_inbound is used in the flow control
2941                          * model. If we cleared it here, then we'd commit
2942                          * a small violation to the model where the transport
2943                          * might immediately block downstream flow.
2944                          */
2945 
2946                         mir->mir_type = iocp->ioc_cmd;
2947                         mir->mir_max_msg_sizep = &svc_max_msg_size;
2948 
2949                         /*
2950                          * Start the idle timer.  See mir_timer() for more
2951                          * information on how server timers work.
2952                          *
2953                          * Note that it is important to start the idle timer
2954                          * here so that connections time out even if we
2955                          * never receive any data on them.
2956                          */
2957                         mir->mir_idle_timeout = svc_idle_timeout;
2958                         RPCLOG(16, "mir_wput_other starting idle timer on 0x%p "
2959                             "because we got RPC_SERVER ioctl\n", (void *)q);
2960                         mir_svc_idle_start(q, mir);
2961                         mutex_exit(&mir->mir_mutex);
2962 
2963                         mp->b_datap->db_type = M_IOCACK;
2964                         qreply(q, mp);
2965                         return;
2966                 default:
2967                         break;
2968                 }
2969                 break;
2970 
2971         case M_PROTO:
2972                 if (mir->mir_type == RPC_CLIENT) {
2973                         /*
2974                          * We are likely being called from the context of a
2975                          * service procedure. So we need to enqueue. However
2976                          * enqueing may put our message behind data messages.
2977                          * So flush the data first.
2978                          */
2979                         flush_in_svc = TRUE;
2980                 }
2981                 if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
2982                     !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
2983                         break;
2984 
2985                 switch (((union T_primitives *)rptr)->type) {
2986                 case T_DATA_REQ:
2987                         /* Don't pass T_DATA_REQ messages downstream. */
2988                         freemsg(mp);
2989                         return;
2990                 case T_ORDREL_REQ:
2991                         RPCLOG(8, "mir_wput_other wq 0x%p: got T_ORDREL_REQ\n",
2992                             (void *)q);
2993                         mutex_enter(&mir->mir_mutex);
2994                         if (mir->mir_type != RPC_SERVER) {
2995                                 /*
2996                                  * We are likely being called from
2997                                  * clnt_dispatch_notifyall(). Sending
2998                                  * a T_ORDREL_REQ will result in
2999                                  * a some kind of _IND message being sent,
3000                                  * will be another call to
3001                                  * clnt_dispatch_notifyall(). To keep the stack
3002                                  * lean, queue this message.
3003                                  */
3004                                 mir->mir_inwservice = 1;
3005                                 (void) putq(q, mp);
3006                                 mutex_exit(&mir->mir_mutex);
3007                                 return;
3008                         }
3009 
3010                         /*
3011                          * Mark the structure such that we don't accept any
3012                          * more requests from client. We could defer this
3013                          * until we actually send the orderly release
3014                          * request downstream, but all that does is delay
3015                          * the closing of this stream.
3016                          */
3017                         RPCLOG(16, "mir_wput_other wq 0x%p: got T_ORDREL_REQ "
3018                             " so calling mir_svc_start_close\n", (void *)q);
3019 
3020                         mir_svc_start_close(q, mir);
3021 
3022                         /*
3023                          * If we have sent down a T_ORDREL_REQ, don't send
3024                          * any more.
3025                          */
3026                         if (mir->mir_ordrel_pending) {
3027                                 freemsg(mp);
3028                                 mutex_exit(&mir->mir_mutex);
3029                                 return;
3030                         }
3031 
3032                         /*
3033                          * If the stream is not idle, then we hold the
3034                          * orderly release until it becomes idle.  This
3035                          * ensures that KRPC will be able to reply to
3036                          * all requests that we have passed to it.
3037                          *
3038                          * We also queue the request if there is data already
3039                          * queued, because we cannot allow the T_ORDREL_REQ
3040                          * to go before data. When we had a separate reply
3041                          * count, this was not a problem, because the
3042                          * reply count was reconciled when mir_wsrv()
3043                          * completed.
3044                          */
3045                         if (!MIR_SVC_QUIESCED(mir) ||
3046                             mir->mir_inwservice == 1) {
3047                                 mir->mir_inwservice = 1;
3048                                 (void) putq(q, mp);
3049 
3050                                 RPCLOG(16, "mir_wput_other: queuing "
3051                                     "T_ORDREL_REQ on 0x%p\n", (void *)q);
3052 
3053                                 mutex_exit(&mir->mir_mutex);
3054                                 return;
3055                         }
3056 
3057                         /*
3058                          * Mark the structure so that we know we sent
3059                          * an orderly release request, and reset the idle timer.
3060                          */
3061                         mir->mir_ordrel_pending = 1;
3062 
3063                         RPCLOG(16, "mir_wput_other: calling mir_svc_idle_start"
3064                             " on 0x%p because we got T_ORDREL_REQ\n",
3065                             (void *)q);
3066 
3067                         mir_svc_idle_start(q, mir);
3068                         mutex_exit(&mir->mir_mutex);
3069 
3070                         /*
3071                          * When we break, we will putnext the T_ORDREL_REQ.
3072                          */
3073                         break;
3074 
3075                 case T_CONN_REQ:
3076                         mutex_enter(&mir->mir_mutex);
3077                         if (mir->mir_head_mp != NULL) {
3078                                 freemsg(mir->mir_head_mp);
3079                                 mir->mir_head_mp = NULL;
3080                                 mir->mir_tail_mp = NULL;
3081                         }
3082                         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
3083                         /*
3084                          * Restart timer in case mir_clnt_idle_do_stop() was
3085                          * called.
3086                          */
3087                         mir->mir_idle_timeout = clnt_idle_timeout;
3088                         mir_clnt_idle_stop(q, mir);
3089                         mir_clnt_idle_start(q, mir);
3090                         mutex_exit(&mir->mir_mutex);
3091                         break;
3092 
3093                 default:
3094                         /*
3095                          * T_DISCON_REQ is one of the interesting default
3096                          * cases here. Ideally, an M_FLUSH is done before
3097                          * T_DISCON_REQ is done. However, that is somewhat
3098                          * cumbersome for clnt_cots.c to do. So we queue
3099                          * T_DISCON_REQ, and let the service procedure
3100                          * flush all M_DATA.
3101                          */
3102                         break;
3103                 }
3104                 /* fallthru */;
3105         default:
3106                 if (mp->b_datap->db_type >= QPCTL) {
3107                         if (mp->b_datap->db_type == M_FLUSH) {
3108                                 if (mir->mir_type == RPC_CLIENT &&
3109                                     *mp->b_rptr & FLUSHW) {
3110                                         RPCLOG(32, "mir_wput_other: flushing "
3111                                             "wq 0x%p\n", (void *)q);
3112                                         if (*mp->b_rptr & FLUSHBAND) {
3113                                                 flushband(q, *(mp->b_rptr + 1),
3114                                                     FLUSHDATA);
3115                                         } else {
3116                                                 flushq(q, FLUSHDATA);
3117                                         }
3118                                 } else {
3119                                         RPCLOG(32, "mir_wput_other: ignoring "
3120                                             "M_FLUSH on wq 0x%p\n", (void *)q);
3121                                 }
3122                         }
3123                         break;
3124                 }
3125 
3126                 mutex_enter(&mir->mir_mutex);
3127                 if (mir->mir_inwservice == 0 && MIR_WCANPUTNEXT(mir, q)) {
3128                         mutex_exit(&mir->mir_mutex);
3129                         break;
3130                 }
3131                 mir->mir_inwservice = 1;
3132                 mir->mir_inwflushdata = flush_in_svc;
3133                 (void) putq(q, mp);
3134                 mutex_exit(&mir->mir_mutex);
3135                 qenable(q);
3136 
3137                 return;
3138         }
3139         putnext(q, mp);
3140 }
3141 
3142 static void
3143 mir_wsrv(queue_t *q)
3144 {
3145         mblk_t  *mp;
3146         mir_t   *mir;
3147         bool_t flushdata;
3148 
3149         mir = (mir_t *)q->q_ptr;
3150         mutex_enter(&mir->mir_mutex);
3151 
3152         flushdata = mir->mir_inwflushdata;
3153         mir->mir_inwflushdata = 0;
3154 
3155         while (mp = getq(q)) {
3156                 if (mp->b_datap->db_type == M_DATA) {
3157                         /*
3158                          * Do not send any more data if we have sent
3159                          * a T_ORDREL_REQ.
3160                          */
3161                         if (flushdata || mir->mir_ordrel_pending == 1) {
3162                                 freemsg(mp);
3163                                 continue;
3164                         }
3165 
3166                         /*
3167                          * Make sure that the stream can really handle more
3168                          * data.
3169                          */
3170                         if (!MIR_WCANPUTNEXT(mir, q)) {
3171                                 (void) putbq(q, mp);
3172                                 mutex_exit(&mir->mir_mutex);
3173                                 return;
3174                         }
3175 
3176                         /*
3177                          * Now we pass the RPC message downstream.
3178                          */
3179                         mutex_exit(&mir->mir_mutex);
3180                         putnext(q, mp);
3181                         mutex_enter(&mir->mir_mutex);
3182                         continue;
3183                 }
3184 
3185                 /*
3186                  * This is not an RPC message, pass it downstream
3187                  * (ignoring flow control) if the server side is not sending a
3188                  * T_ORDREL_REQ downstream.
3189                  */
3190                 if (mir->mir_type != RPC_SERVER ||
3191                     ((union T_primitives *)mp->b_rptr)->type !=
3192                     T_ORDREL_REQ) {
3193                         mutex_exit(&mir->mir_mutex);
3194                         putnext(q, mp);
3195                         mutex_enter(&mir->mir_mutex);
3196                         continue;
3197                 }
3198 
3199                 if (mir->mir_ordrel_pending == 1) {
3200                         /*
3201                          * Don't send two T_ORDRELs
3202                          */
3203                         freemsg(mp);
3204                         continue;
3205                 }
3206 
3207                 /*
3208                  * Mark the structure so that we know we sent an orderly
3209                  * release request.  We will check to see slot is idle at the
3210                  * end of this routine, and if so, reset the idle timer to
3211                  * handle orderly release timeouts.
3212                  */
3213                 mir->mir_ordrel_pending = 1;
3214                 RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
3215                     (void *)q);
3216                 /*
3217                  * Send the orderly release downstream. If there are other
3218                  * pending replies we won't be able to send them.  However,
3219                  * the only reason we should send the orderly release is if
3220                  * we were idle, or if an unusual event occurred.
3221                  */
3222                 mutex_exit(&mir->mir_mutex);
3223                 putnext(q, mp);
3224                 mutex_enter(&mir->mir_mutex);
3225         }
3226 
3227         if (q->q_first == NULL)
3228                 /*
3229                  * If we call mir_svc_idle_start() below, then
3230                  * clearing mir_inwservice here will also result in
3231                  * any thread waiting in mir_close() to be signaled.
3232                  */
3233                 mir->mir_inwservice = 0;
3234 
3235         if (mir->mir_type != RPC_SERVER) {
3236                 mutex_exit(&mir->mir_mutex);
3237                 return;
3238         }
3239 
3240         /*
3241          * If idle we call mir_svc_idle_start to start the timer (or wakeup
3242          * a close). Also make sure not to start the idle timer on the
3243          * listener stream. This can cause nfsd to send an orderly release
3244          * command on the listener stream.
3245          */
3246         if (MIR_SVC_QUIESCED(mir) && !(mir->mir_listen_stream)) {
3247                 RPCLOG(16, "mir_wsrv: calling mir_svc_idle_start on 0x%p "
3248                     "because mir slot is idle\n", (void *)q);
3249                 mir_svc_idle_start(q, mir);
3250         }
3251 
3252         /*
3253          * If outbound flow control has been relieved, then allow new
3254          * inbound requests to be processed.
3255          */
3256         if (mir->mir_hold_inbound) {
3257                 mir->mir_hold_inbound = 0;
3258                 qenable(RD(q));
3259         }
3260         mutex_exit(&mir->mir_mutex);
3261 }
3262 
3263 static void
3264 mir_disconnect(queue_t *q, mir_t *mir)
3265 {
3266         ASSERT(MUTEX_HELD(&mir->mir_mutex));
3267 
3268         switch (mir->mir_type) {
3269         case RPC_CLIENT:
3270                 /*
3271                  * We are disconnecting, but not necessarily
3272                  * closing. By not closing, we will fail to
3273                  * pick up a possibly changed global timeout value,
3274                  * unless we store it now.
3275                  */
3276                 mir->mir_idle_timeout = clnt_idle_timeout;
3277                 mir_clnt_idle_start(WR(q), mir);
3278                 mutex_exit(&mir->mir_mutex);
3279 
3280                 /*
3281                  * T_DISCON_REQ is passed to KRPC as an integer value
3282                  * (this is not a TPI message).  It is used as a
3283                  * convenient value to indicate a sanity check
3284                  * failure -- the same KRPC routine is also called
3285                  * for T_DISCON_INDs and T_ORDREL_INDs.
3286                  */
3287                 clnt_dispatch_notifyall(WR(q), T_DISCON_REQ, 0);
3288                 break;
3289 
3290         case RPC_SERVER:
3291                 mir->mir_svc_no_more_msgs = 1;
3292                 mir_svc_idle_stop(WR(q), mir);
3293                 mutex_exit(&mir->mir_mutex);
3294                 RPCLOG(16, "mir_disconnect: telling "
3295                     "stream head listener to disconnect stream "
3296                     "(0x%p)\n", (void *) q);
3297                 (void) mir_svc_policy_notify(q, 2);
3298                 break;
3299 
3300         default:
3301                 mutex_exit(&mir->mir_mutex);
3302                 break;
3303         }
3304 }
3305 
3306 /*
3307  * Sanity check the message length, and if it's too large, shutdown the
3308  * connection.  Returns 1 if the connection is shutdown; 0 otherwise.
3309  */
3310 static int
3311 mir_check_len(queue_t *q, int32_t frag_len, mblk_t *head_mp)
3312 {
3313         mir_t *mir = q->q_ptr;
3314         uint_t maxsize = 0;
3315 
3316         if (mir->mir_max_msg_sizep != NULL)
3317                 maxsize = *mir->mir_max_msg_sizep;
3318 
3319         if (maxsize == 0 || frag_len <= (int)maxsize)
3320                 return (0);
3321 
3322         freemsg(head_mp);
3323         mir->mir_head_mp = NULL;
3324         mir->mir_tail_mp = NULL;
3325         mir->mir_frag_header = 0;
3326         mir->mir_frag_len = -(int32_t)sizeof (uint32_t);
3327         if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
3328                 cmn_err(CE_NOTE,
3329                     "KRPC: record fragment from %s of size(%d) exceeds "
3330                     "maximum (%u). Disconnecting",
3331                     (mir->mir_type == RPC_CLIENT) ? "server" :
3332                     (mir->mir_type == RPC_SERVER) ? "client" :
3333                     "test tool", frag_len, maxsize);
3334         }
3335 
3336         mir_disconnect(q, mir);
3337         return (1);
3338 }
--- EOF ---