PushPublish one to many

Hi,

I’m trying to build a pushpublish module that essentially pushes all streams from the live origin servers down to a configurable number of edge servers. With the help of some nice code examples, I was able to get the module to accomplish this task. However, upon PushPublishRTMP.disconnect / removeSession the module only removes the streams from the last configured hostname listed in my Application.xml

package com.wowza.wms.plugin.pushpublish.module;
import java.util.*;
import com.wowza.wms.amf.AMFPacket;
import com.wowza.wms.application.IApplicationInstance;
import com.wowza.wms.application.WMSProperties;
import com.wowza.wms.logging.WMSLoggerFactory;
import com.wowza.wms.module.ModuleBase;
import com.wowza.wms.pushpublish.protocol.rtmp.*;
import com.wowza.wms.stream.*;
//import com.wowza.wms.logging.*;
public class PushOneToMany extends ModuleBase {
	private String hostApp = "liveorigin";
	private int hostPort = 1935;
	private String[] hostList = null;
	public void onAppStart(IApplicationInstance appInstance) {
		WMSProperties props = appInstance.getProperties();
		if (props != null) {
			this.hostList = (props.getPropertyStr("PushHosts", "")
					.toLowerCase()).split(",");
			this.hostPort = props.getPropertyInt("PushHostsPort", hostPort);
			this.hostApp = props.getPropertyStr("PushHostsAppName", hostApp);
		}
	}
	Map<IMediaStream, PushPublishRTMP> publishers = new HashMap<IMediaStream, PushPublishRTMP>();
	class StreamNotify implements IMediaStreamActionNotify2 {
		public void onPlay(IMediaStream stream, String streamName,
				double playStart, double playLen, int playReset) {
		}
		public void onPause(IMediaStream stream, boolean isPause,
				double location) {
		}
		public void onSeek(IMediaStream stream, double location) {
		}
		public void onStop(IMediaStream stream) {
		}
		public void onMetaData(IMediaStream stream, AMFPacket metaDataPacket) {
		}
		public void onPauseRaw(IMediaStream stream, boolean isPause,
				double location) {
		}
		public void onPublish(IMediaStream stream, String streamName,
				boolean isRecord, boolean isAppend) {
			try {
				IApplicationInstance appInstance = stream.getStreams()
						.getAppInstance();
				synchronized (publishers) {
					for (int i = 0; i < hostList.length; i++) {
						int dstPort = hostPort;
						String dstApplication = hostApp.toString();
						String dstStreamName = streamName;
						String flashVersion = PushPublishRTMP.CURRENTFMLEVERSION;
						String dstHost = hostList[i].toString();
						PushPublishRTMP publisher = new PushPublishRTMP();
						// Source Stream
						publisher.setAppInstance(appInstance);
						publisher.setSrcStreamName(streamName);
						// Destination Stream
						publisher.setHost(dstHost);
						publisher.setPort(dstPort);
						publisher.setDstApplicationName(dstApplication);
						publisher.setDstStreamName(dstStreamName);
						publisher.setConnectionFlashVersion(flashVersion);
						// Uncomment if pushing to Adobe Media Server
						// publisher[i].setSendOriginalTimecodes(true);
						// publisher[i].setOriginalTimecodeThreshold(0x100000);
						// SecureToken shared secret
						// publisher[i].setSecureTokenSharedSecret("#ed%h0#w@1");
						publisher.setSendFCPublish(true);
						publisher.setSendReleaseStream(true);
						publisher.setSendOnMetadata(true);
						publisher.setDebugLog(true);
						publisher.setDebugPackets(false);
						// Uncomment if target server protected using RTMP
						// authentication
						// PushPublishRTMPAuthProviderAdobe
						// adobeRTMPAuthProvider = new
						// PushPublishRTMPAuthProviderAdobe();
						// adobeRTMPAuthProvider.init(publisher[i]);
						// adobeRTMPAuthProvider.setUserName("username");
						// adobeRTMPAuthProvider.setPassword("password");
						// publisher[i].setRTMPAuthProvider(adobeRTMPAuthProvider);
						publisher.connect();
						publishers.put(stream, publisher);
					}
					// return;
				}
			} catch (Exception e) {
				WMSLoggerFactory.getLogger(null).error(
						"ModulePushPublishSimpleExample#StreamNotify.onPublish: "
								+ e.toString());
			}
		}
		public void onUnPublish(IMediaStream stream, String streamName,
				boolean isRecord, boolean isAppend) {
			stopPublisher(stream);
		}
	}
	public void stopPublisher(IMediaStream stream) {
		try {
			synchronized (publishers) {
				PushPublishRTMP publisher = publishers.remove(stream);
				if (publisher != null)
					publisher.disconnect();
			}
		} catch (Exception e) {
			WMSLoggerFactory.getLogger(null).error(
					"ModulePushPublishSimpleExample#StreamNotify.onPublish: "
							+ e.toString());
		}
	}
	public void onStreamCreate(IMediaStream stream) {
		stream.addClientListener(new StreamNotify());
	}
	public void onStreamDestory(IMediaStream stream) {
		stopPublisher(stream);
	}
}

Application properties:

			<Property>
				<Name>PushHosts</Name>
				<Value>host1,host2,host3,host4</Value>
				<Type>String</Type>
			</Property>
			<Property>
				<Name>PushHostsPort</Name>
				<Value>60568</Value>
				<Type>Integer</Type>
			</Property>
			<Property>
				<Name>PushHostsAppName</Name>
				<Value>liveorigin</Value>
				<Type>String</Type>
			</Property>

All 4 hosts successfully receive the stream from the origin, only “host4” disconnects when the stream is destroyed on the origin.

Any help is greatly appreciated!

.

I believe the problem is the way you’re using the HashMap to store the PushPublishRTMP instances. The problem is that each time you add a new PushPublishRTMP, you’re actually removing the reference to the previous one.

Specifically the line below

publishers.put(stream, publisher);

Each time this is executed, any previous value for publishers.get(stream) would be lost. The publisher is still there, but you no longer have a pointer to it.

To fix this, you need to store an Array of publishers in the map instead of just one: (UNTESTED, LIKELY INACCURATE CODE)

Map<IMediaStream, ArrayList<PushPublishRTMP> > publishers = new HashMap<IMediaStream, ArrayList<PushPublishRTMP>>();
...
for (int i = 0; i < hostList.length; i++) {
   if(publishers.hasKey( stream ) != null){
       publishers.put(stream, new ArrayList<PushPublishRTMP>());
   }
...
    publishers.get(stream).push( publisher );
}

And then to remove them, you’ll need to iterate through the list:

synchronized (publishers) {
				ArrayList<PushPublishRTMP> publisherList = publishers.remove(stream);
				if (publisher != null){
                                        for (int i = 0; i < publisherList.length; i++) {
					    publisherList[i].disconnect();
                                        }
			}

Hope this helps!