#ifndef lint
static char *sccsid = "@(#)procdist.c	1.1 (UKC) 22/2/88";
#endif  lint

extern int histsize;

/*
 * Compute Mandelbrot fractal pattern.  Distributed version.
 */

#include "mandel.h"
#include "rmandel.h"
#include <sys/tsbsp.h>
#include <sys/tserrno.h>
#include <libtsb.h>

static short		*pixel;			/* Stores one pixel row		*/
extern	int	*hist;			/* Pixel density histogram	*/
extern	char	line[];			/* General text work area	*/
extern	FILE *	pixfd;			/* Pixel file (binary)		*/
extern	char *	progname;		/* argv[0] for error reporting	*/

static struct server {
	int tsfd;	/* fd open to transport service for this server */
	char *address;	/* network address of server */
	int doing;	/* -1 = idle, 0 not in use, +ve which line it's doing */
	int count;	/* how many lines it did */
} server[] = {
	/* tsfd == -1 indicates connection closed */
	{ -1, "raven/srv/mansrv" },
	{ -1, "capella/srv/mansrv" },
	{ -1, "betelgeuse/srv/mansrv" },
/*
	{ -1, "hobby/srv/mansrv" },
	{ -1, "jay/srv/mansrv" },
	{ -1, "lucy/srv/mansrv" },
	{ -1, "mike/srv/mansrv" },
	{ -1, "osprey/srv/mansrv" },
	{ -1, "gos/srv/mansrv" },
	{ -1, "falcon/srv/mansrv" },
	{ -1, "mars/srv/mansrv" },
	{ -1, "merlin/srv/mansrv" },
	{ -1, "kestrel/srv/mansrv" },
*/
	{ -1, (char *) 0 }			/* end marker */
};
struct server *fdtos[20];	/* find server by tsfd */

static int nwaiting = 0;	/* number of servers waiting to serve us */

static char tspar2[256];
static char tspar3[256];
static char tspar4[256];

tscmd_t tsin = {
	0,	/* dummy command */
	{
		{ 0, (char *) 0 },	/* must fill in real values */
		{ sizeof(tspar2), tspar2 },
		{ sizeof(tspar3), tspar3 },
		{ sizeof(tspar4), tspar4 },
	}
};

struct start_block sb;

tscmd_t tsout = {
	TS_PUSH,
	{
		{ sizeof(struct start_block),  (char *) &sb },
		{ sizeof(tspar2), tspar2 },
		{ sizeof(tspar3), tspar3 },
		{ sizeof(tspar4), tspar4 },
	}
};

tscmd_t *response;	/* for replies from ts_connect etc */

/*
 * Compute the Mandelbrot set.
 */
process()
{
	register int	j;		/* Column counter		*/
	register int	i;		/* Row counter			*/
	double	c_imag;			/* Pixel position (constant)	*/
	double	float_pixels = npixel;	/* To computer pixel position	*/

	/* stuff for select() system call */
	int selmask = 0;		/* which fds to select on	*/
	int nfds = 0;			/* max tsfd+1			*/
	struct server *sp;

fprintf(stderr, "pixfd == %x\n", (int)pixfd);
	/* construct start block (same for all processors) */
	sb.start_real = start_real;
	sb.side = side;
	sb.npixel = npixel;
	sb.maxiter = niter;

	/* establish connection to all servers and send start block */
	for (i=0; server[i].address; i++) {
		int tsfd;

		if ((tsfd = gettsb()) < 0) {
			fprintf(stderr, "%s: Out of tsbsp ports for %s.\n", progname, server[i].address);
			server[i].address = 0;		/* end marker */
			break;		/* try no more servers */
		}
		response = tsconnect(tsfd, server[i].address, (char*)0, (char *)0, (char *)0);
		switch (response->ts_command) {
		case TS_ACCEPT:
			break;

		case TS_DISCONNECT:
			explaindisc(response);
		default:
			(void) close(tsfd);
			fprintf(stderr, "%s: Connection not ACCEPTed to %s.\n",
				progname, server[i].address);
			continue;	/* try others */
		}

		/* send start block */
		if (write(tsfd, (char *) &tsout, sizeof(tsout)) != sizeof(tsout)) {
			fprintf(stderr, "%s: write of start block to %s fails.\n", progname, server[i].address);
			continue;
		}

		server[i].tsfd = tsfd;
		server[i].doing = -1;
		fdtos[tsfd] = &server[i];
		if (tsfd + 1 > nfds) nfds = tsfd+1;
		nwaiting++;
	}

	if (nwaiting == 0) {
		fprintf(stderr, "%s: Cannot rouse any servers.\n", progname);
		exit(1);
	}

	/* all further sends will be continuation blocks */
	tsout.ts_par[0].ts_nbytes = sizeof(c_imag);
	tsout.ts_par[0].ts_data   = (char *) &c_imag;

	/* allocate space for pixel buffer */
	pixel = (short *) malloc((unsigned) npixel * sizeof(short));
	if (pixel == (short *) NULL) {
		fprintf(stderr, "%s: Out of memory.\n", progname);
		exit(1);
	}

	tsin.ts_par[0].ts_nbytes = npixel * sizeof(short);
	tsin.ts_par[0].ts_data = (char *) pixel;

	for (i = 1; i <= npixel; i++) {
		c_imag = start_imag - (side * i / float_pixels);

		/* now find a processor willing to do it! */
		while (nwaiting == 0) {
			/* bugger. hang around until somebody is free */
			int readmask;

			readmask = selmask;
			if (select(nfds, &readmask, (int *) 0, (int *) 0, 0) <= 0) {
				fprintf(stderr, "%s: select fails.\n", progname);
				exit(1);
			}

			/* process all incoming blocks */
			for (j=0; j<nfds; j++) {
				if (readmask & 1<<j) {
					readblock(j);
					if (!fdtos[j]) {
fprintf(stderr, "%s: duff fd %d.\n", progname, j);
exit(1);
					}
					fdtos[j]->count++;
					selmask &= ~(1<<j);
					fdtos[j]->doing = -1;
					nwaiting++;
				}
			}
		}

		/* farm out this line to somebody */
		for (j=0, sp=server; sp->address; j++, sp++) {
			if (sp->doing == -1) {
				if (write(sp->tsfd, (char *)&tsout, sizeof(tsout))
					!= sizeof(tsout)) {
					fprintf(stderr, "%s: write fails to fd %d", progname, j);
					perror("");
					sp->doing = 0;
					(void) close(sp->tsfd);
					nwaiting--;
					continue;
				}
				sp->doing = i;
				nwaiting--;
				selmask |= 1 << sp->tsfd;
				break;
			}
		}
		if (!sp->address) {
			fprintf(stderr, "%s: failed to find an idle server for line %d.\n", progname, i);
			exit(1);
		}
	}					/* All rows in picture	*/

	/* hang around until everybody replies */
	while (selmask != 0) {
		int readmask;

		readmask = selmask;
		if (select(nfds, &readmask, (int *) 0, (int *) 0, 0) <= 0) {
			fprintf(stderr, "%s: select fails in end loop.\n", progname);
			exit(1);
		}

		/* process all incoming blocks */
		for (j=0; j<nfds; j++) {
			if (readmask & 1<<j) {
				readblock(j);
				fdtos[j]->count++;
				selmask &= ~(1<<j);
				fdtos[j]->doing = -1;
				nwaiting++;
			}
		}
	}
}

readblock(tsfd)
{
	int j;

	if (read(tsfd, (char *) &tsin, sizeof(tsin)) != sizeof(tsin)) {
		fprintf(stderr, "%s: read fails\n", progname);
		return;		/* what else can we do? */
	}

	switch (tsin.ts_command) {
	case TS_PUSH:	/* ok */
		break;
	case TS_DISCONNECT:
		explaindisc(&tsin);
		return;
	default:
		fprintf(stderr, "%s: Unknown TS command %d.\n", progname, tsin.ts_command);
		exit(1);
	}

	if (tsin.ts_par[0].ts_nbytes != npixel * sizeof(pixel[0])) {
		fprintf(stderr, "%s: wrong size block received.\n", progname);
		return;
	}

	/* trying to find a bug... */
	if (fileno(pixfd) == 0) {
		fprintf(stderr, "pixfd -> stdin, == %x\n", (int)pixfd);
		exit(1);
	}

	/* i starts at 1 */
	if (fseek(pixfd, (long)((fdtos[tsfd]->doing-1) * npixel * sizeof(pixel[0])), 0) == -1) {
		fprintf(stderr, "%s: fseek fails.\n", progname);
		exit(1);
	}

	(void) fwrite((char *)pixel, sizeof(pixel[0]), npixel, pixfd);
	if (ferror(pixfd)) {
		perror("pixel file write error");
		return;
	}

	if (histsize == 0) {
		fputs("histsize == 0!`\n", stderr);
		exit(1);
	}
	for (j=0; j<npixel; j++) hist[pixel[j]]++;
}

/*
 *	Turn disconnect packet into text on stderr for user
 */
explaindisc(response)
tscmd_t *response;
{
	fprintf(stderr, "%s: %s", progname,
		tserror(response->ts_par[0].ts_data[0]) );

	/* Location */
	if (response->ts_par[1].ts_nbytes > 0) {
		fprintf(stderr, ", location=\"%*s\"", 
			response->ts_par[1].ts_nbytes,
			response->ts_par[1].ts_data);
	}

	/* Explanatory text */
	if (response->ts_par[2].ts_nbytes > 0) {
		fprintf(stderr, ", explan=\"%*s\"", 
			response->ts_par[2].ts_nbytes,
			response->ts_par[2].ts_data);
	}
	fputs(".\n", stderr);
}
