libdap Updated for version 3.20.11
libdap4 is an implementation of OPeNDAP's DAP protocol.
XDRStreamMarshaller.cc
1// XDRStreamMarshaller.cc
2
3// -*- mode: c++; c-basic-offset:4 -*-
4
5// This file is part of libdap, A C++ implementation of the OPeNDAP Data
6// Access Protocol.
7
8// Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
9// Author: Patrick West <pwest@ucar.edu>
10// James Gallagher <jgallagher@opendap.org>
11//
12// This library is free software; you can redistribute it and/or
13// modify it under the terms of the GNU Lesser General Public
14// License as published by the Free Software Foundation; either
15// version 2.1 of the License, or (at your option) any later version.
16//
17// This library is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20// Lesser General Public License for more details.
21//
22// You should have received a copy of the GNU Lesser General Public
23// License along with this library; if not, write to the Free Software
24// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25//
26// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
27
28// (c) COPYRIGHT URI/MIT 1994-1999
29// Please read the full copyright statement in the file COPYRIGHT_URI.
30//
31// Authors:
32// pwest Patrick West <pwest@ucar.edu>
33
34
35#include "config.h"
36
37#ifdef HAVE_PTHREAD_H
38#include <pthread.h>
39#endif
40
41#include <cassert>
42
43#include <iostream>
44#include <sstream>
45#include <iomanip>
46
47// #define DODS_DEBUG
48
49#include "XDRStreamMarshaller.h"
50#ifdef USE_POSIX_THREADS
51#include "MarshallerThread.h"
52#endif
53#include "Vector.h"
54#include "XDRUtils.h"
55#include "util.h"
56
57#include "debug.h"
58#include "DapIndent.h"
59
60using namespace std;
61
62// Build this code so it does not use pthreads to write some kinds of
63// data (see the put_vector() and put_vector_part() methods) in a child thread.
64// #undef USE_POSIX_THREADS
65
66namespace libdap {
67
68char *XDRStreamMarshaller::d_buf = 0;
69static const int XDR_DAP_BUFF_SIZE=256;
70
71
80XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
81 d_out(out), d_partial_put_byte_count(0), tm(0)
82{
83 if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
84 if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
85
86 xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
87
88#ifdef USE_POSIX_THREADS
89 tm = new MarshallerThread;
90#endif
91}
92
93XDRStreamMarshaller::~XDRStreamMarshaller()
94{
95 // Added this because when USE_POS... is not defined, 'tm' has no
96 // type, which the compiler complains about.
97#ifdef USE_POSIX_THREADS
98 delete tm;
99#endif
100 xdr_destroy(&d_sink);
101}
102
103void XDRStreamMarshaller::put_byte(dods_byte val)
104{
105 if (!xdr_setpos(&d_sink, 0))
106 throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
107
108 if (!xdr_char(&d_sink, (char *) &val))
109 throw Error(
110 "Network I/O Error. Could not send byte data.");
111
112 unsigned int bytes_written = xdr_getpos(&d_sink);
113 if (!bytes_written)
114 throw Error(
115 "Network I/O Error. Could not send byte data - unable to get stream position.");
116
117#ifdef USE_POSIX_THREADS
118 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
119#endif
120
121 d_out.write(d_buf, bytes_written);
122}
123
124void XDRStreamMarshaller::put_int16(dods_int16 val)
125{
126 if (!xdr_setpos(&d_sink, 0))
127 throw Error(
128 "Network I/O Error. Could not send int 16 data - unable to set stream position.");
129
130 if (!XDR_INT16(&d_sink, &val))
131 throw Error(
132 "Network I/O Error. Could not send int 16 data.");
133
134 unsigned int bytes_written = xdr_getpos(&d_sink);
135 if (!bytes_written)
136 throw Error(
137 "Network I/O Error. Could not send int 16 data - unable to get stream position.");
138
139#ifdef USE_POSIX_THREADS
140 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
141#endif
142
143 d_out.write(d_buf, bytes_written);
144}
145
146void XDRStreamMarshaller::put_int32(dods_int32 val)
147{
148 if (!xdr_setpos(&d_sink, 0))
149 throw Error(
150 "Network I/O Error. Could not send int 32 data - unable to set stream position.");
151
152 if (!XDR_INT32(&d_sink, &val))
153 throw Error(
154 "Network I/O Error. Culd not read int 32 data.");
155
156 unsigned int bytes_written = xdr_getpos(&d_sink);
157 if (!bytes_written)
158 throw Error(
159 "Network I/O Error. Could not send int 32 data - unable to get stream position.");
160
161#ifdef USE_POSIX_THREADS
162 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
163#endif
164
165 d_out.write(d_buf, bytes_written);
166}
167
168void XDRStreamMarshaller::put_float32(dods_float32 val)
169{
170 if (!xdr_setpos(&d_sink, 0))
171 throw Error(
172 "Network I/O Error. Could not send float 32 data - unable to set stream position.");
173
174 if (!xdr_float(&d_sink, &val))
175 throw Error(
176 "Network I/O Error. Could not send float 32 data.");
177
178 unsigned int bytes_written = xdr_getpos(&d_sink);
179 if (!bytes_written)
180 throw Error(
181 "Network I/O Error. Could not send float 32 data - unable to get stream position.");
182
183#ifdef USE_POSIX_THREADS
184 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
185#endif
186
187 d_out.write(d_buf, bytes_written);
188}
189
190void XDRStreamMarshaller::put_float64(dods_float64 val)
191{
192 if (!xdr_setpos(&d_sink, 0))
193 throw Error(
194 "Network I/O Error. Could not send float 64 data - unable to set stream position.");
195
196 if (!xdr_double(&d_sink, &val))
197 throw Error(
198 "Network I/O Error. Could not send float 64 data.");
199
200 unsigned int bytes_written = xdr_getpos(&d_sink);
201 if (!bytes_written)
202 throw Error(
203 "Network I/O Error. Could not send float 64 data - unable to get stream position.");
204
205#ifdef USE_POSIX_THREADS
206 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
207#endif
208
209 d_out.write(d_buf, bytes_written);
210}
211
212void XDRStreamMarshaller::put_uint16(dods_uint16 val)
213{
214 if (!xdr_setpos(&d_sink, 0))
215 throw Error(
216 "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
217
218 if (!XDR_UINT16(&d_sink, &val))
219 throw Error(
220 "Network I/O Error. Could not send uint 16 data.");
221
222 unsigned int bytes_written = xdr_getpos(&d_sink);
223 if (!bytes_written)
224 throw Error(
225 "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
226
227#ifdef USE_POSIX_THREADS
228 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
229#endif
230
231 d_out.write(d_buf, bytes_written);
232}
233
234void XDRStreamMarshaller::put_uint32(dods_uint32 val)
235{
236 if (!xdr_setpos(&d_sink, 0))
237 throw Error(
238 "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
239
240 if (!XDR_UINT32(&d_sink, &val))
241 throw Error(
242 "Network I/O Error. Could not send uint 32 data.");
243
244 unsigned int bytes_written = xdr_getpos(&d_sink);
245 if (!bytes_written)
246 throw Error(
247 "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
248
249#ifdef USE_POSIX_THREADS
250 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
251#endif
252
253 d_out.write(d_buf, bytes_written);
254}
255
256void XDRStreamMarshaller::put_str(const string &val)
257{
258 int size = val.length() + 8;
259
260 XDR str_sink;
261 vector<char> str_buf(size);
262
263 try {
264 xdrmem_create(&str_sink, str_buf.data(), size, XDR_ENCODE);
265
266 if (!xdr_setpos(&str_sink, 0))
267 throw Error(
268 "Network I/O Error. Could not send string data - unable to set stream position.");
269
270 const char *out_tmp = val.c_str();
271 if (!xdr_string(&str_sink, (char **) &out_tmp, size))
272 throw Error(
273 "Network I/O Error. Could not send string data.");
274
275 unsigned int bytes_written = xdr_getpos(&str_sink);
276 if (!bytes_written)
277 throw Error(
278 "Network I/O Error. Could not send string data - unable to get stream position.");
279
280#ifdef USE_POSIX_THREADS
281 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
282#endif
283
284 d_out.write(str_buf.data(), bytes_written);
285
286 xdr_destroy(&str_sink);
287 }
288 catch (...) {
289 xdr_destroy(&str_sink);
290 throw;
291 }
292}
293
294void XDRStreamMarshaller::put_url(const string &val)
295{
296 put_str(val);
297}
298
299void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
300{
301 if (len > XDR_DAP_BUFF_SIZE)
302 throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
303
304 if (!xdr_setpos(&d_sink, 0))
305 throw Error(
306 "Network I/O Error. Could not send opaque data - unable to set stream position.");
307
308 if (!xdr_opaque(&d_sink, val, len))
309 throw Error(
310 "Network I/O Error. Could not send opaque data.");
311
312 unsigned int bytes_written = xdr_getpos(&d_sink);
313 if (!bytes_written)
314 throw Error(
315 "Network I/O Error. Could not send opaque data - unable to get stream position.");
316
317#ifdef USE_POSIX_THREADS
318 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
319#endif
320
321 d_out.write(d_buf, bytes_written);
322}
323
324void XDRStreamMarshaller::put_int(int val)
325{
326 if (!xdr_setpos(&d_sink, 0))
327 throw Error(
328 "Network I/O Error. Could not send int data - unable to set stream position.");
329
330 if (!xdr_int(&d_sink, &val))
331 throw Error(
332 "Network I/O Error(1). Could not send int data.");
333
334 unsigned int bytes_written = xdr_getpos(&d_sink);
335 if (!bytes_written)
336 throw Error(
337 "Network I/O Error. Could not send int data - unable to get stream position.");
338
339#ifdef USE_POSIX_THREADS
340 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
341#endif
342
343 d_out.write(d_buf, bytes_written);
344}
345
346void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
347{
348 put_vector(val, num, width, vec.var()->type());
349}
350
351
360{
361 put_int(num);
362 put_int(num);
363
364 d_partial_put_byte_count = 0;
365}
366
374{
375#ifdef USE_POSIX_THREADS
376 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
377#endif
378
379 // Compute the trailing (padding) bytes
380
381 // Note that the XDR standard pads values to 4 byte boundaries.
382 //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
383 unsigned int mod_4 = d_partial_put_byte_count & 0x03;
384 unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
385
386 if (pad) {
387 vector<char> padding(4, 0); // 4 zeros
388
389 d_out.write(padding.data(), pad);
390 if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
391 }
392}
393
394// Start of parallel I/O support. jhrg 8/19/15
395void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
396{
397 if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
398
399 // write the number of members of the array being written and then set the position to 0
400 put_int(num);
401
402 // this is the word boundary for writing xdr bytes in a vector.
403 const unsigned int add_to = 8;
404 // switch to memory on the heap since the thread will need to access it
405 // after this code returns.
406 char *byte_buf = new char[num + add_to];
407 XDR byte_sink;
408 try {
409 xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
410 if (!xdr_setpos(&byte_sink, 0))
411 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
412
413 if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
414 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
415
416 unsigned int bytes_written = xdr_getpos(&byte_sink);
417 if (!bytes_written)
418 throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
419
420#ifdef USE_POSIX_THREADS
421 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
422 tm->increment_child_thread_count();
423 tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
424 xdr_destroy(&byte_sink);
425#else
426 d_out.write(byte_buf, bytes_written);
427 xdr_destroy(&byte_sink);
428 delete [] byte_buf;
429#endif
430
431 }
432 catch (...) {
433 DBG(cerr << "Caught an exception in put_vector_thread" << endl);
434 xdr_destroy(&byte_sink);
435 delete [] byte_buf;
436 throw;
437 }
438}
439
440// private
451void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
452{
453 assert(val || num == 0);
454
455 // write the number of array members being written, then set the position back to 0
456 put_int(num);
457
458 if (num == 0)
459 return;
460
461 int use_width = width;
462 if (use_width < 4) use_width = 4;
463
464 // the size is the number of elements num times the width of each
465 // element, then add 4 bytes for the number of elements
466 int size = (num * use_width) + 4;
467
468 // allocate enough memory for the elements
469 //vector<char> vec_buf(size);
470 char *vec_buf = new char[size];
471 XDR vec_sink;
472 try {
473 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
474
475 // set the position of the sink to 0, we're starting at the beginning
476 if (!xdr_setpos(&vec_sink, 0))
477 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
478
479 // write the array to the buffer
480 if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
481 throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
482
483 // how much was written to the buffer
484 unsigned int bytes_written = xdr_getpos(&vec_sink);
485 if (!bytes_written)
486 throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
487
488#ifdef USE_POSIX_THREADS
489 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
490 tm->increment_child_thread_count();
491 tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
492 xdr_destroy(&vec_sink);
493#else
494 d_out.write(vec_buf, bytes_written);
495 xdr_destroy(&vec_sink);
496 delete [] vec_buf;
497#endif
498 }
499 catch (...) {
500 xdr_destroy(&vec_sink);
501 delete [] vec_buf;
502 throw;
503 }
504}
505
517void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
518{
519 if (width == 1) {
520 // Add space for the 4 bytes of length info and 4 bytes for padding, even though
521 // we will not send either of those.
522 const unsigned int add_to = 8;
523 unsigned int bufsiz = num + add_to;
524 //vector<char> byte_buf(bufsiz);
525 char *byte_buf = new char[bufsiz];
526 XDR byte_sink;
527 try {
528 xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
529 if (!xdr_setpos(&byte_sink, 0))
530 throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
531
532 if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
533 throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
534
535#ifdef USE_POSIX_THREADS
536 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
537 tm->increment_child_thread_count();
538
539 // Increment the element count so we can figure out about the padding in put_vector_last()
540 d_partial_put_byte_count += num;
541
542 tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
543 xdr_destroy(&byte_sink);
544#else
545 // Only send the num bytes that follow the 4 bytes of length info - we skip the
546 // length info because it's already been sent and we don't send any trailing padding
547 // bytes in this method (see put_vector_last() for that).
548 d_out.write(byte_buf + 4, num);
549
550 if (d_out.fail())
551 throw Error ("Network I/O Error. Could not send initial part of byte vector data");
552
553 // Now increment the element count so we can figure out about the padding in put_vector_last()
554 d_partial_put_byte_count += num;
555
556 xdr_destroy(&byte_sink);
557 delete [] byte_buf;
558#endif
559 }
560 catch (...) {
561 xdr_destroy(&byte_sink);
562 delete [] byte_buf;
563 throw;
564 }
565 }
566 else {
567 int use_width = (width < 4) ? 4 : width;
568
569 // the size is the number of elements num times the width of each
570 // element, then add 4 bytes for the (int) number of elements
571 int size = (num * use_width) + 4;
572
573 // allocate enough memory for the elements
574 //vector<char> vec_buf(size);
575 char *vec_buf = new char[size];
576 XDR vec_sink;
577 try {
578 xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
579
580 // set the position of the sink to 0, we're starting at the beginning
581 if (!xdr_setpos(&vec_sink, 0))
582 throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
583
584 // write the array to the buffer
585 if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
586 throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
587
588#ifdef USE_POSIX_THREADS
589 Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
590 tm->increment_child_thread_count();
591
592 // Increment the element count so we can figure out about the padding in put_vector_last()
593 d_partial_put_byte_count += (size - 4);
594 tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
595 xdr_destroy(&vec_sink);
596#else
597 // write that much out to the output stream, skipping the length data that
598 // XDR writes since we have already written the length info using put_vector_start()
599 d_out.write(vec_buf + 4, size - 4);
600
601 if (d_out.fail())
602 throw Error ("Network I/O Error. Could not send part of vector data");
603
604 // Now increment the element count so we can figure out about the padding in put_vector_last()
605 d_partial_put_byte_count += (size - 4);
606
607 xdr_destroy(&vec_sink);
608 delete [] vec_buf;
609#endif
610 }
611 catch (...) {
612 xdr_destroy(&vec_sink);
613 delete [] vec_buf;
614 throw;
615 }
616 }
617}
618
619void XDRStreamMarshaller::dump(ostream &strm) const
620{
621 strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
622}
623
624} // namespace libdap
625
A class for error processing.
Definition: Error.h:94
A class for software fault reporting.
Definition: InternalErr.h:65
static void * write_thread(void *arg)
static void * write_thread_part(void *arg)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
Holds a one-dimensional collection of DAP2 data types.
Definition: Vector.h:81
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
virtual void put_vector_start(int num)
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition: XDRUtils.cc:145
top level DAP object to house generic methods
Definition: AlarmHandler.h:36
Type
Identifies the data type.
Definition: Type.h:94