Logs: Attached in the file
Infrastructure:
service-1 -> Wowza -> service-2
service-1 sends a RTMP stream to Wowza servers, which then uses PushPublishRTMP module (code added at end of this description) to forward stream to service-2
Issue: We have been getting a lot of addSession and removeSession for streams, which causes the RTMP server in service-2 to behave abnormally. Note that the incoming stream from service-1 is steady in this entire duration, so it is unclear to us why Wowza is restarting sessions. Is there a way to increase timeout so that Wowza tries using the current session instead of creating a new one?
package com.mindtickle.dynamic.stream.target.adder;
import java.util.HashMap;
import java.util.Map;
import com.wowza.wms.amf.AMFPacket;
import com.wowza.wms.application.IApplicationInstance;
import com.wowza.wms.application.WMSProperties;
import com.wowza.wms.module.ModuleBase;
import com.wowza.wms.pushpublish.protocol.rtmp.PushPublishRTMP;
import com.wowza.wms.stream.IMediaStream;
import com.wowza.wms.stream.IMediaStreamActionNotify2;
public class ModuleTargetAdder extends ModuleBase
{
String appName;
String destinationIP;
int destinationPort;
Map<IMediaStream, PushPublishRTMP> publishers = new HashMap<IMediaStream, PushPublishRTMP>();
private boolean checkPropertiesPresent() {
boolean propertiesPresent = true;
if (destinationIP.isEmpty() || destinationPort == 0) {
propertiesPresent = false;
}
return propertiesPresent;
}
public void onAppStart(IApplicationInstance appInstance) {
appName = appInstance.getApplication().getName();
getLogger().info("ModuleTargetAdder: Starting for application:" + appName);
WMSProperties props = appInstance.getProperties();
destinationIP = props.getPropertyStr("destinationIP", destinationIP);
destinationPort = props.getPropertyInt("destinationPort", destinationPort);
if (!checkPropertiesPresent()) {
getLogger().error("ModuleTargetAdder: All properties required are not present. Module won't function for application:" + appName);
}
}
public void onStreamCreate(IMediaStream stream)
{
getLogger().info("ModuleTargetAdder: onStreamCreate hit for stream:" + stream);
WMSProperties props = stream.getProperties();
synchronized(props)
{
StreamNotify streamNotify = new StreamNotify();
props.put("streamNotify", streamNotify);
stream.addClientListener(streamNotify);
}
}
public void onStreamDestroy(IMediaStream stream)
{
getLogger().info("ModuleTargetAdder: onStreamDestroy hit for stream:" + stream);
WMSProperties props = stream.getProperties();
synchronized(props)
{
StreamNotify streamNotify = (StreamNotify)props.get("streamNotify");
stream.removeClientListener(streamNotify);
}
}
class StreamNotify implements IMediaStreamActionNotify2
{
public void onPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend)
{
if (!checkPropertiesPresent()) {
getLogger().error("ModuleTargetAdder: All properties required are not present. Skipping publishing to target");
} else {
try
{
IApplicationInstance appInstance = stream.getStreams().getAppInstance();
synchronized(publishers)
{
PushPublishRTMP publisher = new PushPublishRTMP();
// Source stream
publisher.setAppInstance(appInstance);
publisher.setSrcStreamName(streamName);
// Destination stream
String dstApplication = appInstance.getApplication().getName();
String dstStreamName = streamName;
String flashVersion = PushPublishRTMP.CURRENTFMLEVERSION;
// Destination stream
publisher.setHost(destinationIP);
publisher.setPort(destinationPort);
publisher.setDstApplicationName(dstApplication);
publisher.setDstStreamName(dstStreamName);
publisher.setConnectionFlashVersion(flashVersion);
publisher.setDebugLog(true);
publisher.setConnectionTimeout(20000);
publisher.setSendFCPublish(true);
publisher.setSendReleaseStream(true);
publisher.setSendOnMetadata(true);
publisher.connect();
publishers.put(stream, publisher);
}
getLogger().info("ModuleTargetAdder: Started publishing stream [" + streamName + "]");
}
catch(Exception e)
{
getLogger().error("ModuleTargetAdder: Error while publishing stream [" + streamName + "] with exception:" + e.toString());
}
}
}
public void onUnPublish(IMediaStream stream, String streamName, boolean isRecord, boolean isAppend)
{
try
{
synchronized(publishers)
{
PushPublishRTMP publisher = publishers.remove(stream);
if (publisher != null){
publisher.disconnect();
getLogger().info("ModuleTargetAdder: Stopped publishing stream [" + streamName + "]");
}
}
}
catch(Exception e)
{
getLogger().error("ModuleTargetAdder: Error occurred in stopPublisher with exception:" + e.toString());
}
}
public void onPlay(IMediaStream stream, String streamName, double playStart, double playLen, int playReset)
{
}
public void onPause(IMediaStream stream, boolean isPause, double location)
{
}
public void onSeek(IMediaStream stream, double location)
{
}
public void onStop(IMediaStream stream)
{
}
public void onMetaData(IMediaStream stream, AMFPacket metaDataPacket)
{
}
public void onPauseRaw(IMediaStream stream, boolean isPause, double location)
{
}
}
}