Add pipe support to client, multi-broadcast draft
This commit is contained in:
parent
47d698cbd7
commit
5faf5bf23e
@ -21,25 +21,42 @@ sp = pUp(s)
|
|||||||
import subprocess
|
import subprocess
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
bufferSize = 1000
|
bufferSize = 1000 # buffer size in bytes
|
||||||
serverAddr = ("127.0.0.1",12000)
|
serverAddr = ("127.0.0.1",12000)
|
||||||
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
|
||||||
|
def listToCommand(lst):
|
||||||
|
cmd = ""
|
||||||
|
for arg in lst:
|
||||||
|
arg = arg.replace("\\","\\\\")
|
||||||
|
arg = arg.replace(",","\\,")
|
||||||
|
#arg = arg.replace('"','\\"')
|
||||||
|
#if " " in arg: arg = '"' +arg+ '"'
|
||||||
|
cmd += arg + ","
|
||||||
|
|
||||||
|
return cmd[:-1]
|
||||||
|
|
||||||
|
def makePayload(lst):
|
||||||
|
cmdText = listToCommand(lst)
|
||||||
|
cmdBytes = cmdText.encode("utf-8")
|
||||||
|
while len(cmdBytes) < 1000:
|
||||||
|
cmdBytes += b" "
|
||||||
|
return cmdBytes
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
connection.settimeout(15)
|
connection.settimeout(15)
|
||||||
connection.connect(serverAddr)
|
connection.connect(serverAddr)
|
||||||
|
connection.sendall(makePayload(sys.argv[1:]))
|
||||||
proc = subprocess.Popen([
|
|
||||||
"ffplay","-f","mpegts",
|
|
||||||
"-i","-",
|
|
||||||
"-fflags","nobuffer",
|
|
||||||
"-flags","low_delay",
|
|
||||||
"-infbuf","-fast","-framedrop"
|
|
||||||
],stdin=subprocess.PIPE)
|
|
||||||
|
|
||||||
while True:
|
if sys.argv[1] == "watch":
|
||||||
data = connection.recv(bufferSize)
|
while True:
|
||||||
proc.stdin.write(data)
|
data = connection.recv(bufferSize)
|
||||||
|
sys.stdout.buffer.write(data)
|
||||||
|
|
||||||
|
if sys.argv[1] == "broadcast":
|
||||||
|
while True:
|
||||||
|
data = sys.stdin.buffer.read(bufferSize)
|
||||||
|
connection.sendall(data)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
@ -23,7 +23,8 @@ import queue
|
|||||||
import socket
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
threads = {}
|
outThreads = {}
|
||||||
|
inThreads = {}
|
||||||
threadId = 0
|
threadId = 0
|
||||||
threadsLock = threading.Lock()
|
threadsLock = threading.Lock()
|
||||||
|
|
||||||
@ -32,70 +33,118 @@ serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|||||||
|
|
||||||
bufferSize = 1000 # buffer size in bytes
|
bufferSize = 1000 # buffer size in bytes
|
||||||
|
|
||||||
proc = subprocess.Popen([
|
def commandToList(cmd):
|
||||||
# binary
|
args = []
|
||||||
"ffmpeg",
|
cArg = ""
|
||||||
# input: audio
|
escape = False
|
||||||
"-f","dshow","-audio_buffer_size","10","-i","audio=virtual-audio-capturer",
|
quoted = False
|
||||||
# input: video
|
for letter in cmd:
|
||||||
"-f","gdigrab","-framerate","30","-i","desktop","-vf","scale=-1:480",
|
if escape == True:
|
||||||
# output-codec: video
|
cArg += letter
|
||||||
"-c:v","libx264","-pix_fmt","yuv420p","-preset","fast","-tune","zerolatency",
|
escape = False
|
||||||
# output-codec: audio
|
continue
|
||||||
"-c:a","aac",
|
|
||||||
# output properties:
|
if letter == "\\":
|
||||||
"-bufsize","2M","-maxrate","1M",
|
escape = True
|
||||||
# output-file:
|
continue
|
||||||
"-f","mpegts",
|
|
||||||
"-"
|
#if quoted == False and letter == ",":
|
||||||
],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
|
if letter == ",":
|
||||||
|
if cArg == "": continue
|
||||||
|
args.append(cArg)
|
||||||
|
cArg = ""
|
||||||
|
continue
|
||||||
|
|
||||||
|
#if letter == '"':
|
||||||
|
# quoted = not quoted
|
||||||
|
# continue
|
||||||
|
|
||||||
|
cArg += letter
|
||||||
|
|
||||||
|
args.append(cArg)
|
||||||
|
|
||||||
|
return args
|
||||||
|
|
||||||
class outThread(threading.Thread):
|
class outThread(threading.Thread):
|
||||||
def __init__(self,threadId,connection,address):
|
def __init__(self,threadId,connection,address,user):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.threadId = threadId
|
self.threadId = threadId
|
||||||
self.queue = queue.Queue()
|
self.queue = queue.Queue()
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.address = address
|
self.address = address
|
||||||
self.connection.settimeout(15)
|
self.user = user
|
||||||
|
|
||||||
|
def closeThread(self):
|
||||||
|
with threadsLock:
|
||||||
|
del outThreads[str(self.threadId)]
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
data = self.queue.get()
|
data = self.queue.get()
|
||||||
self.connection.sendall(data)
|
self.connection.sendall(data)
|
||||||
|
self.closeThread()
|
||||||
except:
|
except:
|
||||||
with threadsLock:
|
self.closeThread()
|
||||||
del threads[str(self.threadId)]
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
class ffmpegThread(threading.Thread):
|
class inThread(threading.Thread):
|
||||||
def __init__(self):
|
def __init__(self,threadId,connection,address):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
|
self.threadId = threadId
|
||||||
|
self.connection = connection
|
||||||
|
self.address = address
|
||||||
|
|
||||||
|
def closeThread(self):
|
||||||
|
with threadsLock:
|
||||||
|
del inThreads[str(self.threadId)]
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while proc.poll() is None:
|
try:
|
||||||
data = proc.stdout.read(bufferSize)
|
global threadId
|
||||||
with threadsLock:
|
data = self.connection.recv(1000).decode("utf-8")
|
||||||
for thread in threads:
|
while data[-1] == " ": data = data[:-1]
|
||||||
thread = threads[thread]
|
args = commandToList(data)
|
||||||
thread.queue.put(data)
|
cmd = args.pop(0)
|
||||||
|
|
||||||
|
user = args[0]
|
||||||
|
self.user = user
|
||||||
|
if len(args) > 1: self.password = args[1]
|
||||||
|
|
||||||
|
if cmd == "watch":
|
||||||
|
with threadsLock:
|
||||||
|
thread = outThread(self.threadId,self.connection,self.address,self.user)
|
||||||
|
outThreads[str(threadId)] = thread
|
||||||
|
thread.start()
|
||||||
|
return
|
||||||
|
|
||||||
|
if cmd == "broadcast":
|
||||||
|
while True:
|
||||||
|
data = self.connection.recv(bufferSize)
|
||||||
|
with threadsLock:
|
||||||
|
for thread in outThreads:
|
||||||
|
thread = outThreads[thread]
|
||||||
|
if thread.user == user:
|
||||||
|
thread.queue.put(data)
|
||||||
|
|
||||||
|
self.closeThread()
|
||||||
|
except:
|
||||||
|
self.closeThread()
|
||||||
|
raise
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
global threadId
|
global threadId
|
||||||
serverSocket.bind(serverAddr)
|
serverSocket.bind(serverAddr)
|
||||||
serverSocket.listen(1024)
|
serverSocket.listen(1024)
|
||||||
ffmpegThread().start()
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
connection, address = serverSocket.accept()
|
connection, address = serverSocket.accept()
|
||||||
|
connection.settimeout(15)
|
||||||
with threadsLock:
|
with threadsLock:
|
||||||
threadId += 1
|
threadId += 1
|
||||||
while str(threadId) in threads:
|
|
||||||
threadId += 1
|
|
||||||
|
|
||||||
thread = outThread(threadId,connection,address)
|
thread = inThread(threadId,connection,address)
|
||||||
threads[str(threadId)] = thread
|
inThreads[str(threadId)] = thread
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
Loading…
Reference in New Issue
Block a user